You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/03 15:27:28 UTC

[incubator-uniffle] branch master updated: [MINOR][IMPROVEMENT] Set heartBeatExecutorService as daemon thread (#121)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fd8ccdd  [MINOR][IMPROVEMENT] Set heartBeatExecutorService as daemon thread (#121)
fd8ccdd is described below

commit fd8ccdd920296745b7f27e3f36ed06238b0f274f
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Wed Aug 3 23:27:23 2022 +0800

    [MINOR][IMPROVEMENT] Set heartBeatExecutorService as daemon thread (#121)
    
    ### What changes were proposed in this pull request?
    When the main thread of the shuffleServer exits, the heartbeat thread should exit as well.
    
    ### Why are the changes needed?
    More formal.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need.
---
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java  | 2 +-
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java  | 3 ++-
 .../src/main/java/org/apache/uniffle/common/web/JettyServer.java   | 5 ++---
 .../src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java | 7 +++++--
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 753d759..cdd58a2 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -184,7 +184,7 @@ public class RssShuffleManager implements ShuffleManager {
       int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
       threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS,
           Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
-          ThreadUtils.getThreadFactory("SendData"));
+          ThreadUtils.getThreadFactory("SendData-%d"));
 
       if (isDriver) {
         heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 030e56f..6f2df5c 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -186,7 +186,8 @@ public class RssShuffleManager implements ShuffleManager {
     int poolSize = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
     int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
     threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS,
-        Queues.newLinkedBlockingQueue(Integer.MAX_VALUE));
+        Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
+        ThreadUtils.getThreadFactory("SendData-%d"));
     if (isDriver) {
       heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
           ThreadUtils.getThreadFactory("rss-heartbeat-%d"));
diff --git a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
index c893e70..55752a3 100644
--- a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.servlet.Servlet;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class JettyServer {
 
@@ -89,8 +89,7 @@ public class JettyServer {
     int maxPoolSize = conf.getInteger(RssBaseConf.JETTY_MAX_POOL_SIZE);
     ExecutorThreadPool pool = new ExecutorThreadPool(
         new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Jetty-%d").build()));
+            new LinkedBlockingQueue<>(), ThreadUtils.getThreadFactory("Jetty-%d")));
     return pool;
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index e4023d2..07a6c9d 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -35,6 +35,7 @@ import org.apache.uniffle.client.factory.CoordinatorClientFactory;
 import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class RegisterHeartBeat {
 
@@ -45,7 +46,8 @@ public class RegisterHeartBeat {
   private final ShuffleServer shuffleServer;
   private final String coordinatorQuorum;
   private final List<CoordinatorClient> coordinatorClients;
-  private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+  private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+      ThreadUtils.getThreadFactory("startHeartBeat-%d"));
   private final ExecutorService heartBeatExecutorService;
   private long heartBeatTimeout;
 
@@ -60,7 +62,8 @@ public class RegisterHeartBeat {
     this.coordinatorClients = factory.createCoordinatorClient(this.coordinatorQuorum);
     this.shuffleServer = shuffleServer;
     this.heartBeatExecutorService = Executors.newFixedThreadPool(
-        conf.getInteger(ShuffleServerConf.SERVER_HEARTBEAT_THREAD_NUM));
+        conf.getInteger(ShuffleServerConf.SERVER_HEARTBEAT_THREAD_NUM),
+        ThreadUtils.getThreadFactory("sendHeartBeat-%d"));
   }
 
   public void startHeartBeat() {