You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/03 15:27:28 UTC
[incubator-uniffle] branch master updated: [MINOR][IMPROVEMENT] Set heartBeatExecutorService as daemon thread (#121)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fd8ccdd [MINOR][IMPROVEMENT] Set heartBeatExecutorService as daemon thread (#121)
fd8ccdd is described below
commit fd8ccdd920296745b7f27e3f36ed06238b0f274f
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Wed Aug 3 23:27:23 2022 +0800
[MINOR][IMPROVEMENT] Set heartBeatExecutorService as daemon thread (#121)
### What changes were proposed in this pull request?
When the main thread of the shuffleServer exits, the heartbeat thread should exit as well.
### Why are the changes needed?
More formal.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need.
---
.../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java | 2 +-
.../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java | 3 ++-
.../src/main/java/org/apache/uniffle/common/web/JettyServer.java | 5 ++---
.../src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java | 7 +++++--
4 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 753d759..cdd58a2 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -184,7 +184,7 @@ public class RssShuffleManager implements ShuffleManager {
int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS,
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
- ThreadUtils.getThreadFactory("SendData"));
+ ThreadUtils.getThreadFactory("SendData-%d"));
if (isDriver) {
heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 030e56f..6f2df5c 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -186,7 +186,8 @@ public class RssShuffleManager implements ShuffleManager {
int poolSize = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS,
- Queues.newLinkedBlockingQueue(Integer.MAX_VALUE));
+ Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
+ ThreadUtils.getThreadFactory("SendData-%d"));
if (isDriver) {
heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("rss-heartbeat-%d"));
diff --git a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
index c893e70..55752a3 100644
--- a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.servlet.Servlet;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public class JettyServer {
@@ -89,8 +89,7 @@ public class JettyServer {
int maxPoolSize = conf.getInteger(RssBaseConf.JETTY_MAX_POOL_SIZE);
ExecutorThreadPool pool = new ExecutorThreadPool(
new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Jetty-%d").build()));
+ new LinkedBlockingQueue<>(), ThreadUtils.getThreadFactory("Jetty-%d")));
return pool;
}
diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index e4023d2..07a6c9d 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -35,6 +35,7 @@ import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
+import org.apache.uniffle.common.util.ThreadUtils;
public class RegisterHeartBeat {
@@ -45,7 +46,8 @@ public class RegisterHeartBeat {
private final ShuffleServer shuffleServer;
private final String coordinatorQuorum;
private final List<CoordinatorClient> coordinatorClients;
- private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("startHeartBeat-%d"));
private final ExecutorService heartBeatExecutorService;
private long heartBeatTimeout;
@@ -60,7 +62,8 @@ public class RegisterHeartBeat {
this.coordinatorClients = factory.createCoordinatorClient(this.coordinatorQuorum);
this.shuffleServer = shuffleServer;
this.heartBeatExecutorService = Executors.newFixedThreadPool(
- conf.getInteger(ShuffleServerConf.SERVER_HEARTBEAT_THREAD_NUM));
+ conf.getInteger(ShuffleServerConf.SERVER_HEARTBEAT_THREAD_NUM),
+ ThreadUtils.getThreadFactory("sendHeartBeat-%d"));
}
public void startHeartBeat() {