You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/14 13:45:17 UTC

[GitHub] [pulsar] lhotari opened a new pull request #12037: [Client] Support passing existing executor providers to the client

lhotari opened a new pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037


   ### Motivation
   
   In load and performance testing, there's a need to simulate production use cases and production workloads.
   For this purpose, it would be useful to be able to share the thread pools used by Pulsar client instances in order to be able to run a large amount of Pulsar clients in a single JVM without the overhead of a lot of threads. 
   In the current solution, it's already possible to share the EventLoopGroup and HashedWheelTimer instances.
   The solution for sharing the thread pools for the external / internal executors was missing. This PR adds support for that.
   
   Example usage:
   
   ```java
   // shared thread pool related resources
   ExecutorProvider internalExecutorProvider = new ExecutorProvider(8, "shared-internal-executor");
   ExecutorProvider externalExecutorProvider = new ExecutorProvider(8, "shared-external-executor");
   Timer sharedTimer = new HashedWheelTimer(getThreadFactory("shared-pulsar-timer"), 1, TimeUnit.MILLISECONDS);
   EventLoopGroup sharedEventLoopGroup = new EpollEventLoopGroup();
   
   // example of creating a client which uses the shared thread pools
   PulsarClientImpl client = PulsarClientImpl.builder().conf(conf)
                   .internalExecutorProvider(internalExecutorProvider)
                   .externalExecutorProvider(externalExecutorProvider)
                   .timer(sharedTimer)
                   .eventLoopGroup(sharedEventLoopGroup)
                   .build();
   ```
   
   ### Modifications
   
   Refactor the code to add an internal builder class so that more constructors don't have to be added for advanced initialization of the PulsarClientImpl class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#issuecomment-1016127456


   @codelipenghui This PR is now ready for final review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#issuecomment-919298722


   > Do you want to use this only inside internal Pulsar tools ? the Proxy, Pulsar perf, Pulsar CLI tools ?
   
   yes, mainly an internal, unsupported API. Pulsar Proxy and Pulsar perf are the main internal use cases.
   
   > Why can't we add this support using the regular PulsarClient Builder API ?
   
   It seems that this would just add unnecessary complexity and extra baggage in the public API. 
   I'd assume that there are very few use cases where there would be a need to specify the thread pools to use.
   
   My use case is in improving [nosqlbench's Pulsar driver](https://github.com/nosqlbench/nosqlbench/tree/main/driver-pulsar). This would allow to emulate thousands of isolated Pulsar clients within a single JVM. When a Pulsar client is shared, it doesn't properly emulate real production work loads.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708873700



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       My use case is in improving [nosqlbench's Pulsar driver](https://github.com/nosqlbench/nosqlbench/tree/main/driver-pulsar). This would allow to emulate thousands of isolated Pulsar clients within a single JVM. When a Pulsar client is shared, it doesn't properly emulate real production work loads.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708875770



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       @315157973 The other important use case is the Pulsar Proxy. Please check the PR description for details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#issuecomment-1015023539


   @lhotari Could you please help resolve the conflicts? It's a nice have in 2.10.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708803873



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       In my opinion, I don't think that sharing the internalExecutorProvider is a good way. This thread pool is very critical. It is best to isolate the thread pool. If any tasks that take a long time are executed in it, it will affect the consumption performance of the client.
   This thread pool is now shared by all Producers and Consumers. Usually when we use the SDK, one process will only create one PulsarClient instance.There will not be too many threads to create.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708873489



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       @315157973  I agree with you. Please check the details of my use case. That's what makes the difference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r786380153



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");

Review comment:
       It should be IllegalArgumentException




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708930965



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       OK, I get your point




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#issuecomment-919661135


   @lhotari Thanks for your contribution. For this PR, do we need to update docs?
   
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r786523841



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708340209



##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
##########
@@ -217,4 +219,32 @@ public void testResourceCleanup() throws PulsarClientException {
             assertFalse(eventLoopGroup.isShutdown());
         }
     }
+
+    @Test
+    public void testInitializingWithExecutorProviders() throws PulsarClientException {
+        ClientConfigurationData conf = clientImpl.conf;
+        @Cleanup("shutdownNow")
+        ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
+        @Cleanup
+        PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .externalExecutorProvider(executorProvider)
+                .build();
+        @Cleanup
+        PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .externalExecutorProvider(executorProvider)
+                .build();
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)

Review comment:
       we should add a more narrow verification of the assertion, IllegalStateException may be thrown for other reasons




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708874554



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       The main reason why it doesn't emulate production work loads is the connection pool and multiplexing over TCP/IP connections that the Pulsar client does for producers and consumers. When emulating production workloads in load testing, that's something that makes a big difference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#issuecomment-1015188417


   > @lhotari Could you please help resolve the conflicts? It's a nice have in 2.10.
   
   @codelipenghui I have rebased the changes and addressed the review comments. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r786523574



##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
##########
@@ -217,4 +219,32 @@ public void testResourceCleanup() throws PulsarClientException {
             assertFalse(eventLoopGroup.isShutdown());
         }
     }
+
+    @Test
+    public void testInitializingWithExecutorProviders() throws PulsarClientException {
+        ClientConfigurationData conf = clientImpl.conf;
+        @Cleanup("shutdownNow")
+        ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
+        @Cleanup
+        PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .externalExecutorProvider(executorProvider)
+                .build();
+        @Cleanup
+        PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(executorProvider)
+                .externalExecutorProvider(executorProvider)
+                .build();
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org