You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/12 16:11:09 UTC

[pulsar] branch master updated: Java Client - Threads to inherit daemon status from thread creating the client (#2770)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c2977be  Java Client - Threads to inherit daemon status from thread creating the client (#2770)
c2977be is described below

commit c2977bef98a2189f0aef70a97c76240bfa24a191
Author: Jean-Bernard van Zuylen <jb...@primeoservices.org>
AuthorDate: Fri Oct 12 18:11:05 2018 +0200

    Java Client - Threads to inherit daemon status from thread creating the client (#2770)
---
 .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java   | 10 +++++++---
 .../java/org/apache/pulsar/client/util/ExecutorProvider.java   |  9 ++++-----
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bac6a85..8a8de09 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -137,13 +137,13 @@ public class PulsarClientImpl implements PulsarClient {
         this.conf = conf;
         conf.getAuthentication().start();
         this.cnxPool = cnxPool;
-        externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
+        externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
         if (conf.getServiceUrl().startsWith("http")) {
             lookup = new HttpLookupService(conf, eventLoopGroup);
         } else {
             lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
         }
-        timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
+        timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
         producers = Maps.newIdentityHashMap();
         consumers = Maps.newIdentityHashMap();
         state.set(State.Open);
@@ -806,10 +806,14 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
-        ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
+        ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
     }
 
+    private static ThreadFactory getThreadFactory(String poolName) {
+        return new DefaultThreadFactory(poolName, Thread.currentThread().isDaemon());
+    }
+
     void cleanupProducer(ProducerBase<?> producer) {
         synchronized (producers) {
             producers.remove(producer);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 907c52f..56069f8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -24,24 +24,23 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 public class ExecutorProvider {
     private final int numThreads;
     private final List<ExecutorService> executors;
     private final AtomicInteger currentThread = new AtomicInteger(0);
 
-    public ExecutorProvider(int numThreads, String threadNamePrefix) {
+    public ExecutorProvider(int numThreads, ThreadFactory threadFactory) {
         checkArgument(numThreads > 0);
         this.numThreads = numThreads;
-        checkNotNull(threadNamePrefix);
+        checkNotNull(threadFactory);
         executors = Lists.newArrayListWithCapacity(numThreads);
         for (int i = 0; i < numThreads; i++) {
-            executors.add(Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(threadNamePrefix)));
+            executors.add(Executors.newSingleThreadScheduledExecutor(threadFactory));
         }
     }