You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/07/12 10:29:35 UTC

[incubator-uniffle] branch master updated: [Improvement] Provides utility classes for creating thread factories (#49)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f6a896  [Improvement] Provides utility classes for creating thread factories (#49)
0f6a896 is described below

commit 0f6a896efbd3a435de5e7a5d28843ecd05c38bde
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Tue Jul 12 18:29:31 2022 +0800

    [Improvement] Provides utility classes for creating thread factories (#49)
    
    ### What changes were proposed in this pull request?
    Provides tool classes for creating thread factories
    
    ### Why are the changes needed?
    Make the code more standardized and beautiful
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Use the original test class
---
 .../hadoop/mapred/SortWriteBufferManager.java      |  7 ++---
 .../apache/spark/shuffle/RssShuffleManager.java    |  6 ++--
 .../apache/spark/shuffle/RssShuffleManager.java    |  4 +--
 .../client/impl/ShuffleWriteClientImpl.java        |  4 +--
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |  4 +--
 .../apache/uniffle/common/util/ThreadUtils.java    | 32 ++++++++++++++++++++++
 .../coordinator/AccessCandidatesChecker.java       |  4 +--
 .../uniffle/coordinator/ApplicationManager.java    |  4 +--
 .../uniffle/coordinator/ClientConfManager.java     |  5 ++--
 .../uniffle/coordinator/SimpleClusterManager.java  |  7 +++--
 .../apache/uniffle/server/ShuffleFlushManager.java |  4 +--
 .../apache/uniffle/server/ShuffleTaskManager.java  |  6 ++--
 12 files changed, 59 insertions(+), 28 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 143ba8e..0382538 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -34,7 +34,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.serializer.Serializer;
@@ -49,6 +48,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class SortWriteBufferManager<K, V> {
 
@@ -139,10 +139,7 @@ public class SortWriteBufferManager<K, V> {
     this.maxBufferSize = maxBufferSize;
     this.sendExecutorService  = Executors.newFixedThreadPool(
         sendThreadNum,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("send-thread-%d")
-            .build());
+        ThreadUtils.getThreadFactory("send-thread-%d"));
   }
 
   // todo: Single Buffer should also have its size limit
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 738589f..c5da1ac 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -62,6 +61,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class RssShuffleManager implements ShuffleManager {
 
@@ -194,11 +194,11 @@ public class RssShuffleManager implements ShuffleManager {
           RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE_DEFAULT_VALUE);
       threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS,
           Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SendData").build());
+          ThreadUtils.getThreadFactory("SendData"));
 
       if (isDriver) {
         heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build());
+            ThreadUtils.getThreadFactory("rss-heartbeat-%d"));
       }
     }
   }
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5c7e2d9..032767a 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -66,6 +65,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class RssShuffleManager implements ShuffleManager {
 
@@ -200,7 +200,7 @@ public class RssShuffleManager implements ShuffleManager {
         Queues.newLinkedBlockingQueue(Integer.MAX_VALUE));
     if (isDriver) {
       heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-              new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build());
+          ThreadUtils.getThreadFactory("rss-heartbeat-%d"));
     }
   }
 
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 168b7c4..669431c 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -33,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +71,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class ShuffleWriteClientImpl implements ShuffleWriteClient {
 
@@ -98,7 +98,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     this.retryIntervalMax = retryIntervalMax;
     this.coordinatorClientFactory = new CoordinatorClientFactory(clientType);
     this.heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum,
-            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build());
+        ThreadUtils.getThreadFactory("client-heartbeat-%d"));
     this.replica = replica;
     this.replicaWrite = replicaWrite;
     this.replicaRead = replicaRead;
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index f3e6edc..1b22196 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.grpc.BindableService;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
@@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class GrpcServer implements ServerInterface {
 
@@ -52,7 +52,7 @@ public class GrpcServer implements ServerInterface {
         10,
         TimeUnit.MINUTES,
         Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Grpc-%d").build()
+        ThreadUtils.getThreadFactory("Grpc-%d")
     );
 
     boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
diff --git a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
new file mode 100644
index 0000000..f8000b6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.util.concurrent.ThreadFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Provide a general method to create a thread factory to make the code more standardized
+ */
+public class ThreadUtils {
+
+  public static ThreadFactory getThreadFactory(String factoryName) {
+    return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName).build();
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
index 02d8399..51d6fe8 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -39,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 /**
  * AccessCandidatesChecker maintain a list of candidate access id and update it periodically,
@@ -75,7 +75,7 @@ public class AccessCandidatesChecker implements AccessChecker {
 
     int updateIntervalS = conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC);
     updateAccessCandidatesSES = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateAccessCandidates-%d").build());
+        ThreadUtils.getThreadFactory("UpdateAccessCandidates-%d"));
     updateAccessCandidatesSES.scheduleAtFixedRate(
         this::updateAccessCandidates, 0, updateIntervalS, TimeUnit.SECONDS);
   }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 4292de7..9d6f0fc 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -31,13 +31,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class ApplicationManager {
 
@@ -59,7 +59,7 @@ public class ApplicationManager {
     expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
     // the thread for checking application status
     scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ApplicationManager-%d").build());
+        ThreadUtils.getThreadFactory("ApplicationManager-%d"));
     scheduledExecutorService.scheduleAtFixedRate(
         () -> statusCheck(), expired / 2, expired / 2, TimeUnit.MILLISECONDS);
   }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
index 85c46b4..c3c1fb5 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.util.ThreadUtils;
+
 public class ClientConfManager implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(ClientConfManager.class);
 
@@ -73,7 +74,7 @@ public class ClientConfManager implements Closeable {
 
     int updateIntervalS = conf.getInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC);
     updateClientConfSES = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ClientConfManager-%d").build());
+        ThreadUtils.getThreadFactory("ClientConfManager-%d"));
     updateClientConfSES.scheduleAtFixedRate(
         this::updateClientConf, 0, updateIntervalS, TimeUnit.SECONDS);
   }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index b96bcd9..98c69a6 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -34,7 +34,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,6 +42,8 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.util.ThreadUtils;
+
 public class SimpleClusterManager implements ClusterManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(SimpleClusterManager.class);
@@ -63,7 +64,7 @@ public class SimpleClusterManager implements ClusterManager {
     this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
     // the thread for checking if shuffle server report heartbeat in time
     scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SimpleClusterManager-%d").build());
+        ThreadUtils.getThreadFactory("SimpleClusterManager-%d"));
     scheduledExecutorService.scheduleAtFixedRate(
         () -> nodesCheck(), heartbeatTimeout / 3,
         heartbeatTimeout / 3, TimeUnit.MILLISECONDS);
@@ -73,7 +74,7 @@ public class SimpleClusterManager implements ClusterManager {
       this.hadoopFileSystem = CoordinatorUtils.getFileSystemForPath(new Path(excludeNodesPath), hadoopConf);
       long updateNodesInterval = conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL);
       checkNodesExecutorService = Executors.newSingleThreadScheduledExecutor(
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateExcludeNodes-%d").build());
+          ThreadUtils.getThreadFactory("UpdateExcludeNodes-%d"));
       checkNodesExecutorService.scheduleAtFixedRate(
           () -> updateExcludeNodes(excludeNodesPath), updateNodesInterval, updateNodesInterval, TimeUnit.MILLISECONDS);
     }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 367032b..edf2b5c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.RangeMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -37,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.Storage;
@@ -84,7 +84,7 @@ public class ShuffleFlushManager {
     int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
     long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
     threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
-            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build());
+        ThreadUtils.getThreadFactory("FlushEventThreadPool"));
     storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(",");
     pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
     // the thread for flush data
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 2ea88e7..0876c51 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -48,6 +47,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.storage.StorageManager;
@@ -100,12 +100,12 @@ public class ShuffleTaskManager {
     this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
     // the thread for checking application status
     this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("checkResource-%d").build());
+        ThreadUtils.getThreadFactory("checkResource-%d"));
     scheduledExecutorService.scheduleAtFixedRate(
         () -> preAllocatedBufferCheck(), preAllocationExpired / 2,
         preAllocationExpired / 2, TimeUnit.MILLISECONDS);
     this.expiredAppCleanupExecutorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("expiredAppCleaner").build());
+        ThreadUtils.getThreadFactory("expiredAppCleaner"));
     expiredAppCleanupExecutorService.scheduleAtFixedRate(
         () -> checkResourceStatus(), appExpiredWithoutHB / 2,
         appExpiredWithoutHB / 2, TimeUnit.MILLISECONDS);