You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/08 12:24:53 UTC
[rocketmq] branch develop updated: [ISSUE #6189] Replace ThreadFactory create (#6190)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5c45294c4 [ISSUE #6189] Replace ThreadFactory create (#6190)
5c45294c4 is described below
commit 5c45294c45199d19c378f8c02049a3804acd5872
Author: hardyfish <85...@users.noreply.github.com>
AuthorDate: Wed Mar 8 20:24:30 2023 +0800
[ISSUE #6189] Replace ThreadFactory create (#6190)
* Replace ThreadFactory create
* delete unnecessary imports
* remove useless import
* reformat code
---
.../broker/topic/TopicRouteInfoManager.java | 9 ++---
.../impl/consumer/DefaultLitePullConsumerImpl.java | 8 +----
.../client/impl/consumer/PullMessageService.java | 13 +++----
.../impl/producer/DefaultMQProducerImpl.java | 33 +++++++-----------
.../apache/rocketmq/common/utils/ThreadUtils.java | 32 ++++-------------
.../rocketmq/example/simple/PullConsumer.java | 13 +++----
.../remoting/netty/NettyRemotingClient.java | 40 +++-------------------
.../netty/RemotingCodeDistributionHandlerTest.java | 12 ++-----
8 files changed, 38 insertions(+), 122 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index cc9c62500..b35564725 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -34,6 +33,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -66,12 +66,7 @@ public class TopicRouteInfoManager {
}
public void start() {
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "TopicRouteInfoManagerScheduledThread");
- }
- });
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e5aed64d3..2d37581bb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.Validators;
@@ -169,12 +168,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "MonitorMessageQueueChangeThread");
- }
- });
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index d4801c335..b5e6f9f79 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -34,12 +34,7 @@ public class PullMessageService extends ServiceThread {
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "PullMessageServiceScheduledThread");
- }
- });
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("PullMessageServiceScheduledThread"));
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
@@ -127,9 +122,9 @@ public class PullMessageService extends ServiceThread {
try {
MessageRequest messageRequest = this.messageRequestQueue.take();
if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
- this.popMessage((PopRequest)messageRequest);
+ this.popMessage((PopRequest) messageRequest);
} else {
- this.pullMessage((PullRequest)messageRequest);
+ this.pullMessage((PullRequest) messageRequest);
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4f6fbafdd..4677df690 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -31,10 +31,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -67,6 +65,7 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
@@ -140,23 +139,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
- new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
- }
- });
+ new ThreadFactoryImpl("AsyncSenderExecutor_"));
if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) {
- semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10), true);
+ semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 10), true);
} else {
semaphoreAsyncSendNum = new Semaphore(10, true);
log.info("semaphoreAsyncSendNum can not be smaller than 10.");
}
if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) {
- semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 * 1024), true);
+ semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 1024 * 1024), true);
} else {
semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
@@ -529,7 +521,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
} else {
sendCallback.onException(
- new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
+ new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
};
@@ -537,8 +529,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
- final long timeout, final long beginStartTime)
- throws MQClientException, InterruptedException {
+ final long timeout, final long beginStartTime)
+ throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
boolean isSemaphoreAsyncNumAquired = false;
@@ -549,18 +541,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (isEnableBackpressureForAsyncMode) {
long costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncNumAquired = timeout - costTime > 0
- && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
+ && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
if (!isSemaphoreAsyncNumAquired) {
sendCallback.onException(
- new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
+ new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
return;
}
costTime = System.currentTimeMillis() - beginStartTime;
isSemaphoreAsyncSizeAquired = timeout - costTime > 0
- && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
+ && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
if (!isSemaphoreAsyncSizeAquired) {
sendCallback.onException(
- new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
+ new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
return;
}
}
@@ -1035,6 +1027,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
executeEndTransactionHook(context);
}
}
+
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
@@ -1231,7 +1224,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
- timeout - costTime);
+ timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 99526f3a1..4b366d4e3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -63,38 +63,20 @@ public final class ThreadUtils {
}
public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
- return new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
- thread.setDaemon(isDaemon);
- return thread;
- }
- };
+ return new ThreadFactoryImpl(processName + "_", isDaemon);
}
public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
final boolean isDaemon) {
- return new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
- thread.setDaemon(isDaemon);
- return thread;
- }
- };
+ return new ThreadFactoryImpl(String.format("%s_%d_", processName, threads), isDaemon);
}
/**
* Create a new thread
*
- * @param name The name of the thread
+ * @param name The name of the thread
* @param runnable The work for the thread to do
- * @param daemon Should the thread block JVM stop?
+ * @param daemon Should the thread block JVM stop?
* @return The unstarted thread
*/
public static Thread newThread(String name, Runnable runnable, boolean daemon) {
@@ -121,7 +103,7 @@ public final class ThreadUtils {
* Shutdown passed thread using isAlive and join.
*
* @param millis Pass 0 if we're to wait forever.
- * @param t Thread to stop
+ * @param t Thread to stop
*/
public static void shutdownGracefully(final Thread t, final long millis) {
if (t == null)
@@ -141,7 +123,7 @@ public final class ThreadUtils {
* {@link ExecutorService}.
*
* @param executor executor
- * @param timeout timeout
+ * @param timeout timeout
* @param timeUnit timeUnit
*/
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index ff9ef9ca3..5ac8d247d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -21,13 +21,13 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -40,17 +40,12 @@ public class PullConsumer {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
Set<String> topics = new HashSet<>();
- //You would better to register topics,It will use in rebalance when starting
+ //You would be better to register topics,It will use in rebalance when starting
topics.add("TopicTest");
consumer.setRegisterTopics(topics);
consumer.start();
- ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "PullConsumerThread");
- }
- });
+ ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactoryImpl("PullConsumerThread"));
for (String topic : consumer.getRegisterTopics()) {
executors.execute(new Runnable() {
@@ -137,7 +132,7 @@ public class PullConsumer {
public void incPullTPS(String topic, int pullSize) {
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
- .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
+ .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
}
});
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 540e0b76d..69d8a275b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -70,6 +69,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -139,37 +139,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
publicThreadNums = 4;
}
- this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
- }
- });
+ this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_"));
this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(32), new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
- }
- }
- );
+ new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_"));
if (eventLoopGroup != null) {
this.eventLoopGroupWorker = eventLoopGroup;
} else {
- this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
- }
- });
+ this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyClientSelector_"));
}
this.defaultEventExecutorGroup = eventExecutorGroup;
@@ -205,15 +183,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
- new ThreadFactory() {
-
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
- }
- });
+ new ThreadFactoryImpl("NettyClientWorkerThread_"));
}
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
index ee6f3f6c2..eb623a9de 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
@@ -21,9 +21,8 @@ import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.junit.Assert;
import org.junit.Test;
@@ -45,14 +44,7 @@ public class RemotingCodeDistributionHandlerTest {
int count = 1000 * 1000;
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicBoolean result = new AtomicBoolean(true);
- ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "RemotingCodeTest_" + this.threadIndex.incrementAndGet());
- }
- });
+ ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactoryImpl("RemotingCodeTest_"));
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {