You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/06 09:28:48 UTC

[GitHub] [pulsar] xche opened a new pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

xche opened a new pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491


   Fixes #8575
   
   Motivation
   
   The reason for this problem is that there is only one PulsarClient in a functionWorker instance, and the number of externalExecutorProvider thread pools in PulsarClient is 1, so all consumers share the same listener executor. Therefore, when one or more tasks stoped, the listener executor will be blocked, and finally other tasks will stop consuming data.
   
   ### Modifications
   
   1. The same subscriber uses the same thread pool
   2. Adding “numListenerThreads” configuration parameter in WorkerConfig
   3. When PulsarSource is closed then clear queue to prevent listener executor from being blocked
   
   
   
   
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-863865600


   > @xche Can you rebase to the latest master?
   
   @sijie rebase done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#discussion_r627920821



##########
File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
##########
@@ -79,14 +79,14 @@ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, Str
                                 String pulsarWebServiceUrl) throws Exception {
         initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
                 storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory,
-                rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty());
+                rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),1);

Review comment:
       Why are we setting the number of listener threads here to be one?  I think this config should also be respected with using the other runtimes as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche removed a comment on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche removed a comment on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-839618977


   > @xche you added a config at the worker level but you PR only applies it to the ThreadRuntime and NOT the rest of the runtimes available.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-839208015


   @xche you added a config at the worker level but you PR only applies it to the ThreadRuntime and NOT the rest of the runtimes available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#discussion_r627921306



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PushPulsarSource.java
##########
@@ -75,4 +75,11 @@ public void consume(Record<T> record) {
     public int getQueueLength() {
         return DEFAULT_QUEUE_LENGTH;
     }
+
+    /**
+     * Clear records that records are push onto
+     */
+    public void clear() {

Review comment:
       This code change doesn't seem relevant to this PR

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -102,7 +102,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
         this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
         this.executorProvider = executorProvider;
-        this.pinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
+        this.pinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor(conf.getSubscriptionName());

Review comment:
       This code change doesn't seem relevant to this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-848571587


   @xche Could you please help check the failed tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-841637543


   @jerrypeng Can you review this again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-839618977


   > @xche you added a config at the worker level but you PR only applies it to the ThreadRuntime and NOT the rest of the runtimes available.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-859308465


   > @xche Could you please help check the failed tests?
   
   @codelipenghui  The  failed tests related to this submission has been processed.  Could you please review this PR again?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#discussion_r630564692



##########
File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
##########
@@ -79,14 +79,14 @@ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, Str
                                 String pulsarWebServiceUrl) throws Exception {
         initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
                 storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory,
-                rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty());
+                rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),1);

Review comment:
       This is not resolved




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-835310776


   > There seems to be some changes that doesn't seem relevant to this PR
   
   It has been dealt with. Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-863788273


   @xche Can you rebase to the latest master?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-833401065


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491#issuecomment-1015003972


   @xche Sorry for the late response, we have fixed a couple of flaky tests in the master branch, could you please try to rebased to the master branch and resolve the conflicts? It's better to have this one in 2.10.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] xche closed pull request #10491: [Functions] Adding “numListenerThreads” configuration parameter in WorkerConfig . #8575

Posted by GitBox <gi...@apache.org>.
xche closed pull request #10491:
URL: https://github.com/apache/pulsar/pull/10491


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org