You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/12 16:11:09 UTC
[pulsar] branch master updated: Java Client - Threads to inherit
daemon status from thread creating the client (#2770)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c2977be Java Client - Threads to inherit daemon status from thread creating the client (#2770)
c2977be is described below
commit c2977bef98a2189f0aef70a97c76240bfa24a191
Author: Jean-Bernard van Zuylen <jb...@primeoservices.org>
AuthorDate: Fri Oct 12 18:11:05 2018 +0200
Java Client - Threads to inherit daemon status from thread creating the client (#2770)
---
.../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 10 +++++++---
.../java/org/apache/pulsar/client/util/ExecutorProvider.java | 9 ++++-----
2 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bac6a85..8a8de09 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -137,13 +137,13 @@ public class PulsarClientImpl implements PulsarClient {
this.conf = conf;
conf.getAuthentication().start();
this.cnxPool = cnxPool;
- externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+ externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
}
- timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
+ timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
producers = Maps.newIdentityHashMap();
consumers = Maps.newIdentityHashMap();
state.set(State.Open);
@@ -806,10 +806,14 @@ public class PulsarClientImpl implements PulsarClient {
}
private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
- ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
+ ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
}
+ private static ThreadFactory getThreadFactory(String poolName) {
+ return new DefaultThreadFactory(poolName, Thread.currentThread().isDaemon());
+ }
+
void cleanupProducer(ProducerBase<?> producer) {
synchronized (producers) {
producers.remove(producer);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 907c52f..56069f8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -24,24 +24,23 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
public class ExecutorProvider {
private final int numThreads;
private final List<ExecutorService> executors;
private final AtomicInteger currentThread = new AtomicInteger(0);
- public ExecutorProvider(int numThreads, String threadNamePrefix) {
+ public ExecutorProvider(int numThreads, ThreadFactory threadFactory) {
checkArgument(numThreads > 0);
this.numThreads = numThreads;
- checkNotNull(threadNamePrefix);
+ checkNotNull(threadFactory);
executors = Lists.newArrayListWithCapacity(numThreads);
for (int i = 0; i < numThreads; i++) {
- executors.add(Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(threadNamePrefix)));
+ executors.add(Executors.newSingleThreadScheduledExecutor(threadFactory));
}
}