You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:35:25 UTC

[pulsar] 01/02: [improve][java-client] Support passing existing scheduled executor providers to the client (#16334)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bb4f1e011666c00c0ce54c252e7e179c58202ae0
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Jul 2 06:29:16 2022 +0800

    [improve][java-client] Support passing existing scheduled executor providers to the client (#16334)
    
    * [improve][java-client] Support passing existing scheduled executor providers to the client
    
    ### Motivation
    
    #16236 introduced a new scheduled executor but does not support passing the existing scheduled executor providers like #12037.
    
    * Apply comment.
    
    (cherry picked from commit f159f62e8017a4286775cfe65632b1415bda9c4b)
---
 .../pulsar/client/impl/PulsarClientImpl.java       | 35 ++++++++++++++--------
 .../pulsar/client/impl/PulsarClientImplTest.java   | 10 +++++++
 2 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9a4bada3278..8fcec401204 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -93,6 +93,8 @@ public class PulsarClientImpl implements PulsarClient {
 
     protected final ClientConfigurationData conf;
     private final boolean createdExecutorProviders;
+
+    private final boolean createdScheduledProviders;
     private LookupService lookup;
     private final ConnectionPool cnxPool;
     @Getter
@@ -139,28 +141,29 @@ public class PulsarClientImpl implements PulsarClient {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, null, null, null, null, null);
+        this(conf, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, null, null, null, null);
+        this(conf, eventLoopGroup, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, null, null);
+        this(conf, eventLoopGroup, cnxPool, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
                             Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, null, null);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
     }
 
     @Builder(builderClassName = "PulsarClientImplBuilder")
     private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
                              Timer timer, ExecutorProvider externalExecutorProvider,
-                             ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+                             ExecutorProvider internalExecutorProvider,
+                             ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
         EventLoopGroup eventLoopGroupReference = null;
         ConnectionPool connectionPoolReference = null;
         try {
@@ -171,9 +174,10 @@ public class PulsarClientImpl implements PulsarClient {
                         "Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
             }
             this.createdExecutorProviders = externalExecutorProvider == null;
+            this.createdScheduledProviders = scheduledExecutorProvider == null;
             eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
             this.eventLoopGroup = eventLoopGroupReference;
-            if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
+            if (conf == null || isBlank(conf.getServiceUrl())) {
                 throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
             }
             setAuth(conf);
@@ -187,8 +191,8 @@ public class PulsarClientImpl implements PulsarClient {
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
-            this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
-                    "pulsar-client-scheduled");
+            this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
+                    new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
             if (conf.getServiceUrl().startsWith("http")) {
                 lookup = new HttpLookupService(conf, this.eventLoopGroup);
             } else {
@@ -817,8 +821,8 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private void shutdownExecutors() throws PulsarClientException {
+        PulsarClientException pulsarClientException = null;
         if (createdExecutorProviders) {
-            PulsarClientException pulsarClientException = null;
 
             if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) {
                 try {
@@ -836,11 +840,18 @@ public class PulsarClientImpl implements PulsarClient {
                     pulsarClientException = PulsarClientException.unwrap(t);
                 }
             }
-
-            if (pulsarClientException != null) {
-                throw pulsarClientException;
+        }
+        if (createdScheduledProviders && scheduledExecutorProvider != null && !scheduledExecutorProvider.isShutdown()) {
+            try {
+                externalExecutorProvider.shutdownNow();
+            } catch (Throwable t) {
+                log.warn("Failed to shutdown scheduledExecutorProvider", t);
+                pulsarClientException = PulsarClientException.unwrap(t);
             }
         }
+        if (pulsarClientException != null) {
+            throw pulsarClientException;
+        }
     }
 
     @Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 386294e24b7..a506066df8b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotSame;
 import static org.testng.Assert.assertSame;
@@ -54,6 +55,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -225,16 +227,24 @@ public class PulsarClientImplTest {
         ClientConfigurationData conf = clientImpl.conf;
         @Cleanup("shutdownNow")
         ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
+        @Cleanup("shutdownNow")
+        ScheduledExecutorProvider scheduledExecutorProvider =
+                new ScheduledExecutorProvider(2, "scheduled-executor");
         @Cleanup
         PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
                 .internalExecutorProvider(executorProvider)
                 .externalExecutorProvider(executorProvider)
+                .scheduledExecutorProvider(scheduledExecutorProvider)
                 .build();
         @Cleanup
         PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
                 .internalExecutorProvider(executorProvider)
                 .externalExecutorProvider(executorProvider)
+                .scheduledExecutorProvider(scheduledExecutorProvider)
                 .build();
+
+        assertEquals(client2.getScheduledExecutorProvider(), scheduledExecutorProvider);
+        assertEquals(client3.getScheduledExecutorProvider(), scheduledExecutorProvider);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,