You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/15 03:13:14 UTC

[GitHub] [pulsar] 315157973 commented on a change in pull request #12037: [Client] Support passing existing executor providers to the client

315157973 commented on a change in pull request #12037:
URL: https://github.com/apache/pulsar/pull/12037#discussion_r708803873



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, getEventLoopGroup(conf), true);
+        this(conf, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
+        this(conf, eventLoopGroup, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, false, false);
+        this(conf, eventLoopGroup, cnxPool, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, false, false);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null);
     }
 
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
-            throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
-    }
-
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
-                             boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
+    @Builder(builderClassName = "PulsarClientImplBuilder")
+    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
+                             Timer timer, ExecutorProvider externalExecutorProvider,
+                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+        EventLoopGroup eventLoopGroupReference = null;
+        ConnectionPool connectionPoolReference = null;
         try {
-            this.createdEventLoopGroup = createdEventLoopGroup;
-            this.createdCnxPool = createdCnxPool;
-            if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
+            this.createdEventLoopGroup = eventLoopGroup == null;
+            this.createdCnxPool = connectionPool == null;
+            if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
+                throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
+            }
+            this.createdExecutorProviders = externalExecutorProvider == null;
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
+            this.eventLoopGroup = eventLoopGroupReference;
+            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
-            this.eventLoopGroup = eventLoopGroup;
             setAuth(conf);
             this.conf = conf;
             clientClock = conf.getClock();
             conf.getAuthentication().start();
-            this.cnxPool = cnxPool;
-            externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
-            internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+            this.cnxPool = connectionPoolReference;
+            this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+            this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

Review comment:
       In my opinion, I don't think that sharing the internalExecutorProvider is a good way. This thread pool is very critical. It is best to isolate the thread pool. If any tasks that take a long time are executed in it, it will affect the consumption performance of the client.
   This thread pool is now shared by all Producers and Consumers. Usually when we use the SDK, one process will only create one PulsarClient instance.There will not be too many threads to create.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org