You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/07/12 10:29:35 UTC
[incubator-uniffle] branch master updated: [Improvement] Provides utility classes for creating thread factories (#49)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0f6a896 [Improvement] Provides utility classes for creating thread factories (#49)
0f6a896 is described below
commit 0f6a896efbd3a435de5e7a5d28843ecd05c38bde
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Tue Jul 12 18:29:31 2022 +0800
[Improvement] Provides utility classes for creating thread factories (#49)
### What changes were proposed in this pull request?
Provides tool classes for creating thread factories
### Why are the changes needed?
Make the code more standardized and beautiful
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Use the original test class
---
.../hadoop/mapred/SortWriteBufferManager.java | 7 ++---
.../apache/spark/shuffle/RssShuffleManager.java | 6 ++--
.../apache/spark/shuffle/RssShuffleManager.java | 4 +--
.../client/impl/ShuffleWriteClientImpl.java | 4 +--
.../org/apache/uniffle/common/rpc/GrpcServer.java | 4 +--
.../apache/uniffle/common/util/ThreadUtils.java | 32 ++++++++++++++++++++++
.../coordinator/AccessCandidatesChecker.java | 4 +--
.../uniffle/coordinator/ApplicationManager.java | 4 +--
.../uniffle/coordinator/ClientConfManager.java | 5 ++--
.../uniffle/coordinator/SimpleClusterManager.java | 7 +++--
.../apache/uniffle/server/ShuffleFlushManager.java | 4 +--
.../apache/uniffle/server/ShuffleTaskManager.java | 6 ++--
12 files changed, 59 insertions(+), 28 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 143ba8e..0382538 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -34,7 +34,6 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.Serializer;
@@ -49,6 +48,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public class SortWriteBufferManager<K, V> {
@@ -139,10 +139,7 @@ public class SortWriteBufferManager<K, V> {
this.maxBufferSize = maxBufferSize;
this.sendExecutorService = Executors.newFixedThreadPool(
sendThreadNum,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("send-thread-%d")
- .build());
+ ThreadUtils.getThreadFactory("send-thread-%d"));
}
// todo: Single Buffer should also have its size limit
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 738589f..c5da1ac 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -62,6 +61,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public class RssShuffleManager implements ShuffleManager {
@@ -194,11 +194,11 @@ public class RssShuffleManager implements ShuffleManager {
RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE_DEFAULT_VALUE);
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS,
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SendData").build());
+ ThreadUtils.getThreadFactory("SendData"));
if (isDriver) {
heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build());
+ ThreadUtils.getThreadFactory("rss-heartbeat-%d"));
}
}
}
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5c7e2d9..032767a 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -66,6 +65,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public class RssShuffleManager implements ShuffleManager {
@@ -200,7 +200,7 @@ public class RssShuffleManager implements ShuffleManager {
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE));
if (isDriver) {
heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build());
+ ThreadUtils.getThreadFactory("rss-heartbeat-%d"));
}
}
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 168b7c4..669431c 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -33,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +71,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
public class ShuffleWriteClientImpl implements ShuffleWriteClient {
@@ -98,7 +98,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
this.retryIntervalMax = retryIntervalMax;
this.coordinatorClientFactory = new CoordinatorClientFactory(clientType);
this.heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build());
+ ThreadUtils.getThreadFactory("client-heartbeat-%d"));
this.replica = replica;
this.replicaWrite = replicaWrite;
this.replicaRead = replicaRead;
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index f3e6edc..1b22196 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public class GrpcServer implements ServerInterface {
@@ -52,7 +52,7 @@ public class GrpcServer implements ServerInterface {
10,
TimeUnit.MINUTES,
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Grpc-%d").build()
+ ThreadUtils.getThreadFactory("Grpc-%d")
);
boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
diff --git a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
new file mode 100644
index 0000000..f8000b6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.util.concurrent.ThreadFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Provide a general method to create a thread factory to make the code more standardized
+ */
+public class ThreadUtils {
+
+ public static ThreadFactory getThreadFactory(String factoryName) {
+ return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName).build();
+ }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
index 02d8399..51d6fe8 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -39,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ThreadUtils;
/**
* AccessCandidatesChecker maintain a list of candidate access id and update it periodically,
@@ -75,7 +75,7 @@ public class AccessCandidatesChecker implements AccessChecker {
int updateIntervalS = conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC);
updateAccessCandidatesSES = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateAccessCandidates-%d").build());
+ ThreadUtils.getThreadFactory("UpdateAccessCandidates-%d"));
updateAccessCandidatesSES.scheduleAtFixedRate(
this::updateAccessCandidates, 0, updateIntervalS, TimeUnit.SECONDS);
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 4292de7..9d6f0fc 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -31,13 +31,13 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ThreadUtils;
public class ApplicationManager {
@@ -59,7 +59,7 @@ public class ApplicationManager {
expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
// the thread for checking application status
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ApplicationManager-%d").build());
+ ThreadUtils.getThreadFactory("ApplicationManager-%d"));
scheduledExecutorService.scheduleAtFixedRate(
() -> statusCheck(), expired / 2, expired / 2, TimeUnit.MILLISECONDS);
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
index 85c46b4..c3c1fb5 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -38,6 +37,8 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.util.ThreadUtils;
+
public class ClientConfManager implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ClientConfManager.class);
@@ -73,7 +74,7 @@ public class ClientConfManager implements Closeable {
int updateIntervalS = conf.getInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC);
updateClientConfSES = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ClientConfManager-%d").build());
+ ThreadUtils.getThreadFactory("ClientConfManager-%d"));
updateClientConfSES.scheduleAtFixedRate(
this::updateClientConf, 0, updateIntervalS, TimeUnit.SECONDS);
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index b96bcd9..98c69a6 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -34,7 +34,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -43,6 +42,8 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.util.ThreadUtils;
+
public class SimpleClusterManager implements ClusterManager {
private static final Logger LOG = LoggerFactory.getLogger(SimpleClusterManager.class);
@@ -63,7 +64,7 @@ public class SimpleClusterManager implements ClusterManager {
this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
// the thread for checking if shuffle server report heartbeat in time
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SimpleClusterManager-%d").build());
+ ThreadUtils.getThreadFactory("SimpleClusterManager-%d"));
scheduledExecutorService.scheduleAtFixedRate(
() -> nodesCheck(), heartbeatTimeout / 3,
heartbeatTimeout / 3, TimeUnit.MILLISECONDS);
@@ -73,7 +74,7 @@ public class SimpleClusterManager implements ClusterManager {
this.hadoopFileSystem = CoordinatorUtils.getFileSystemForPath(new Path(excludeNodesPath), hadoopConf);
long updateNodesInterval = conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL);
checkNodesExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateExcludeNodes-%d").build());
+ ThreadUtils.getThreadFactory("UpdateExcludeNodes-%d"));
checkNodesExecutorService.scheduleAtFixedRate(
() -> updateExcludeNodes(excludeNodesPath), updateNodesInterval, updateNodesInterval, TimeUnit.MILLISECONDS);
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 367032b..edf2b5c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.RangeMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -37,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.common.Storage;
@@ -84,7 +84,7 @@ public class ShuffleFlushManager {
int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build());
+ ThreadUtils.getThreadFactory("FlushEventThreadPool"));
storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(",");
pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
// the thread for flush data
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 2ea88e7..0876c51 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -48,6 +47,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.StorageManager;
@@ -100,12 +100,12 @@ public class ShuffleTaskManager {
this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
// the thread for checking application status
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("checkResource-%d").build());
+ ThreadUtils.getThreadFactory("checkResource-%d"));
scheduledExecutorService.scheduleAtFixedRate(
() -> preAllocatedBufferCheck(), preAllocationExpired / 2,
preAllocationExpired / 2, TimeUnit.MILLISECONDS);
this.expiredAppCleanupExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("expiredAppCleaner").build());
+ ThreadUtils.getThreadFactory("expiredAppCleaner"));
expiredAppCleanupExecutorService.scheduleAtFixedRate(
() -> checkResourceStatus(), appExpiredWithoutHB / 2,
appExpiredWithoutHB / 2, TimeUnit.MILLISECONDS);