You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:59 UTC

[incubator-uniffle] 15/17: [Minor] Make clearResourceThread and processEventThread daemon (#207)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit ba47aa017f67e681af7c311c4ef8578eef740d4b
Author: Zhen Wang <64...@qq.com>
AuthorDate: Thu Jun 30 14:56:54 2022 +0800

    [Minor] Make clearResourceThread and processEventThread daemon (#207)
    
    ### What changes were proposed in this pull request?
    Make clearResourceThread daemon and processEventThread daemon.
    
    ### Why are the changes needed?
    `clearResourceThread` and `processEventThread` never exits, we can make it daemon.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Nod
---
 .../java/com/tencent/rss/server/ShuffleFlushManager.java     | 12 ++++++++----
 .../main/java/com/tencent/rss/server/ShuffleTaskManager.java |  1 +
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
index e246b02..be941ac 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
@@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.RangeMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -60,7 +61,6 @@ public class ShuffleFlushManager {
   private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> handlers = Maps.newConcurrentMap();
   // appId -> shuffleId -> committed shuffle blockIds
   private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds = Maps.newConcurrentMap();
-  private Runnable processEventThread;
   private final int retryMax;
 
   private final StorageManager storageManager;
@@ -84,11 +84,12 @@ public class ShuffleFlushManager {
     BlockingQueue<Runnable> waitQueue = Queues.newLinkedBlockingQueue(waitQueueSize);
     int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
     long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
-    threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue);
+    threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build());
     storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(",");
     pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
     // the thread for flush data
-    processEventThread = () -> {
+    Runnable processEventRunnable = () -> {
       while (true) {
         try {
           ShuffleDataFlushEvent event = flushQueue.take();
@@ -103,7 +104,10 @@ public class ShuffleFlushManager {
         }
       }
     };
-    new Thread(processEventThread).start();
+    Thread processEventThread = new Thread(processEventRunnable);
+    processEventThread.setName("ProcessEventThread");
+    processEventThread.setDaemon(true);
+    processEventThread.start();
     // todo: extract a class named Service, and support stop method
     Thread thread = new Thread("PendingEventProcessThread") {
       @Override
diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
index e847779..fc37a19 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
@@ -123,6 +123,7 @@ public class ShuffleTaskManager {
     };
     Thread thread = new Thread(clearResourceThread);
     thread.setName("clearResourceThread");
+    thread.setDaemon(true);
     thread.start();
   }