You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:59 UTC
[incubator-uniffle] 15/17: [Minor] Make clearResourceThread and processEventThread daemon (#207)
This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit ba47aa017f67e681af7c311c4ef8578eef740d4b
Author: Zhen Wang <64...@qq.com>
AuthorDate: Thu Jun 30 14:56:54 2022 +0800
[Minor] Make clearResourceThread and processEventThread daemon (#207)
### What changes were proposed in this pull request?
Make clearResourceThread daemon and processEventThread daemon.
### Why are the changes needed?
`clearResourceThread` and `processEventThread` never exits, we can make it daemon.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Nod
---
.../java/com/tencent/rss/server/ShuffleFlushManager.java | 12 ++++++++----
.../main/java/com/tencent/rss/server/ShuffleTaskManager.java | 1 +
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
index e246b02..be941ac 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
@@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.RangeMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -60,7 +61,6 @@ public class ShuffleFlushManager {
private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> handlers = Maps.newConcurrentMap();
// appId -> shuffleId -> committed shuffle blockIds
private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds = Maps.newConcurrentMap();
- private Runnable processEventThread;
private final int retryMax;
private final StorageManager storageManager;
@@ -84,11 +84,12 @@ public class ShuffleFlushManager {
BlockingQueue<Runnable> waitQueue = Queues.newLinkedBlockingQueue(waitQueueSize);
int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
- threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue);
+ threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build());
storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(",");
pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
// the thread for flush data
- processEventThread = () -> {
+ Runnable processEventRunnable = () -> {
while (true) {
try {
ShuffleDataFlushEvent event = flushQueue.take();
@@ -103,7 +104,10 @@ public class ShuffleFlushManager {
}
}
};
- new Thread(processEventThread).start();
+ Thread processEventThread = new Thread(processEventRunnable);
+ processEventThread.setName("ProcessEventThread");
+ processEventThread.setDaemon(true);
+ processEventThread.start();
// todo: extract a class named Service, and support stop method
Thread thread = new Thread("PendingEventProcessThread") {
@Override
diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
index e847779..fc37a19 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
@@ -123,6 +123,7 @@ public class ShuffleTaskManager {
};
Thread thread = new Thread(clearResourceThread);
thread.setName("clearResourceThread");
+ thread.setDaemon(true);
thread.start();
}