You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:35:25 UTC
[pulsar] 01/02: [improve][java-client] Support passing existing scheduled executor providers to the client (#16334)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bb4f1e011666c00c0ce54c252e7e179c58202ae0
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Jul 2 06:29:16 2022 +0800
[improve][java-client] Support passing existing scheduled executor providers to the client (#16334)
* [improve][java-client] Support passing existing scheduled executor providers to the client
### Motivation
#16236 introduced a new scheduled executor but does not support passing the existing scheduled executor providers like #12037.
* Apply comment.
(cherry picked from commit f159f62e8017a4286775cfe65632b1415bda9c4b)
---
.../pulsar/client/impl/PulsarClientImpl.java | 35 ++++++++++++++--------
.../pulsar/client/impl/PulsarClientImplTest.java | 10 +++++++
2 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9a4bada3278..8fcec401204 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -93,6 +93,8 @@ public class PulsarClientImpl implements PulsarClient {
protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
+
+ private final boolean createdScheduledProviders;
private LookupService lookup;
private final ConnectionPool cnxPool;
@Getter
@@ -139,28 +141,29 @@ public class PulsarClientImpl implements PulsarClient {
private TransactionCoordinatorClientImpl tcClient;
public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
- this(conf, null, null, null, null, null);
+ this(conf, null, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
- this(conf, eventLoopGroup, null, null, null, null);
+ this(conf, eventLoopGroup, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, null, null, null);
+ this(conf, eventLoopGroup, cnxPool, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, timer, null, null);
+ this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
}
@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
- ExecutorProvider internalExecutorProvider) throws PulsarClientException {
+ ExecutorProvider internalExecutorProvider,
+ ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
@@ -171,9 +174,10 @@ public class PulsarClientImpl implements PulsarClient {
"Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
}
this.createdExecutorProviders = externalExecutorProvider == null;
+ this.createdScheduledProviders = scheduledExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
- if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
+ if (conf == null || isBlank(conf.getServiceUrl())) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
setAuth(conf);
@@ -187,8 +191,8 @@ public class PulsarClientImpl implements PulsarClient {
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
- this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
- "pulsar-client-scheduled");
+ this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
+ new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
@@ -817,8 +821,8 @@ public class PulsarClientImpl implements PulsarClient {
}
private void shutdownExecutors() throws PulsarClientException {
+ PulsarClientException pulsarClientException = null;
if (createdExecutorProviders) {
- PulsarClientException pulsarClientException = null;
if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) {
try {
@@ -836,11 +840,18 @@ public class PulsarClientImpl implements PulsarClient {
pulsarClientException = PulsarClientException.unwrap(t);
}
}
-
- if (pulsarClientException != null) {
- throw pulsarClientException;
+ }
+ if (createdScheduledProviders && scheduledExecutorProvider != null && !scheduledExecutorProvider.isShutdown()) {
+ try {
+ externalExecutorProvider.shutdownNow();
+ } catch (Throwable t) {
+ log.warn("Failed to shutdown scheduledExecutorProvider", t);
+ pulsarClientException = PulsarClientException.unwrap(t);
}
}
+ if (pulsarClientException != null) {
+ throw pulsarClientException;
+ }
}
@Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 386294e24b7..a506066df8b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
@@ -54,6 +55,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -225,16 +227,24 @@ public class PulsarClientImplTest {
ClientConfigurationData conf = clientImpl.conf;
@Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
+ @Cleanup("shutdownNow")
+ ScheduledExecutorProvider scheduledExecutorProvider =
+ new ScheduledExecutorProvider(2, "scheduled-executor");
@Cleanup
PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.externalExecutorProvider(executorProvider)
+ .scheduledExecutorProvider(scheduledExecutorProvider)
.build();
@Cleanup
PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.externalExecutorProvider(executorProvider)
+ .scheduledExecutorProvider(scheduledExecutorProvider)
.build();
+
+ assertEquals(client2.getScheduledExecutorProvider(), scheduledExecutorProvider);
+ assertEquals(client3.getScheduledExecutorProvider(), scheduledExecutorProvider);
}
@Test(expectedExceptions = IllegalArgumentException.class,