You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/08 12:24:53 UTC

[rocketmq] branch develop updated: [ISSUE #6189] Replace ThreadFactory create (#6190)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5c45294c4 [ISSUE #6189] Replace ThreadFactory create (#6190)
5c45294c4 is described below

commit 5c45294c45199d19c378f8c02049a3804acd5872
Author: hardyfish <85...@users.noreply.github.com>
AuthorDate: Wed Mar 8 20:24:30 2023 +0800

    [ISSUE #6189] Replace ThreadFactory create (#6190)
    
    * Replace ThreadFactory create
    
    * delete unnecessary imports
    
    * remove useless import
    
    * reformat code
---
 .../broker/topic/TopicRouteInfoManager.java        |  9 ++---
 .../impl/consumer/DefaultLitePullConsumerImpl.java |  8 +----
 .../client/impl/consumer/PullMessageService.java   | 13 +++----
 .../impl/producer/DefaultMQProducerImpl.java       | 33 +++++++-----------
 .../apache/rocketmq/common/utils/ThreadUtils.java  | 32 ++++-------------
 .../rocketmq/example/simple/PullConsumer.java      | 13 +++----
 .../remoting/netty/NettyRemotingClient.java        | 40 +++-------------------
 .../netty/RemotingCodeDistributionHandlerTest.java | 12 ++-----
 8 files changed, 38 insertions(+), 122 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index cc9c62500..b35564725 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -34,6 +33,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -66,12 +66,7 @@ public class TopicRouteInfoManager {
     }
 
     public void start() {
-        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "TopicRouteInfoManagerScheduledThread");
-            }
-        });
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
 
         this.scheduledExecutorService.scheduleAtFixedRate(() -> {
             try {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e5aed64d3..2d37581bb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.Validators;
@@ -169,12 +168,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             this.defaultLitePullConsumer.getPullThreadNums(),
             new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
         );
-        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "MonitorMessageQueueChangeThread");
-            }
-        });
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
         this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index d4801c335..b5e6f9f79 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.impl.consumer;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -34,12 +34,7 @@ public class PullMessageService extends ServiceThread {
 
     private final MQClientInstance mQClientFactory;
     private final ScheduledExecutorService scheduledExecutorService = Executors
-        .newSingleThreadScheduledExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "PullMessageServiceScheduledThread");
-            }
-        });
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("PullMessageServiceScheduledThread"));
 
     public PullMessageService(MQClientInstance mQClientFactory) {
         this.mQClientFactory = mQClientFactory;
@@ -127,9 +122,9 @@ public class PullMessageService extends ServiceThread {
             try {
                 MessageRequest messageRequest = this.messageRequestQueue.take();
                 if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
-                    this.popMessage((PopRequest)messageRequest);
+                    this.popMessage((PopRequest) messageRequest);
                 } else {
-                    this.pullMessage((PullRequest)messageRequest);
+                    this.pullMessage((PullRequest) messageRequest);
                 }
             } catch (InterruptedException ignored) {
             } catch (Exception e) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4f6fbafdd..4677df690 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -31,10 +31,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -67,6 +65,7 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.compression.CompressionType;
 import org.apache.rocketmq.common.compression.Compressor;
@@ -140,23 +139,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.asyncSenderThreadPoolQueue,
-            new ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
-                }
-            });
+            new ThreadFactoryImpl("AsyncSenderExecutor_"));
         if (defaultMQProducer.getBackPressureForAsyncSendNum() > 10) {
-            semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),10), true);
+            semaphoreAsyncSendNum = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 10), true);
         } else {
             semaphoreAsyncSendNum = new Semaphore(10, true);
             log.info("semaphoreAsyncSendNum can not be smaller than 10.");
         }
 
         if (defaultMQProducer.getBackPressureForAsyncSendNum() > 1024 * 1024) {
-            semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(),1024 * 1024), true);
+            semaphoreAsyncSendSize = new Semaphore(Math.max(defaultMQProducer.getBackPressureForAsyncSendNum(), 1024 * 1024), true);
         } else {
             semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
             log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
@@ -529,7 +521,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                     }
                 } else {
                     sendCallback.onException(
-                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
+                        new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                 }
             }
         };
@@ -537,8 +529,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
-                                         final long timeout, final long beginStartTime)
-            throws MQClientException, InterruptedException {
+        final long timeout, final long beginStartTime)
+        throws MQClientException, InterruptedException {
         ExecutorService executor = this.getAsyncSenderExecutor();
         boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
         boolean isSemaphoreAsyncNumAquired = false;
@@ -549,18 +541,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             if (isEnableBackpressureForAsyncMode) {
                 long costTime = System.currentTimeMillis() - beginStartTime;
                 isSemaphoreAsyncNumAquired = timeout - costTime > 0
-                        && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
+                    && semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
                 if (!isSemaphoreAsyncNumAquired) {
                     sendCallback.onException(
-                            new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
+                        new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
                     return;
                 }
                 costTime = System.currentTimeMillis() - beginStartTime;
                 isSemaphoreAsyncSizeAquired = timeout - costTime > 0
-                        && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
+                    && semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
                 if (!isSemaphoreAsyncSizeAquired) {
                     sendCallback.onException(
-                            new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
+                        new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
                     return;
                 }
             }
@@ -1035,6 +1027,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             executeEndTransactionHook(context);
         }
     }
+
     /**
      * DEFAULT ONEWAY -------------------------------------------------------
      */
@@ -1231,7 +1224,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                     try {
                         try {
                             sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
-                                    timeout - costTime);
+                                timeout - costTime);
                         } catch (MQBrokerException e) {
                             throw new MQClientException("unknown exception", e);
                         }
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 99526f3a1..4b366d4e3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -63,38 +63,20 @@ public final class ThreadUtils {
     }
 
     public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
-        return new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
-                thread.setDaemon(isDaemon);
-                return thread;
-            }
-        };
+        return new ThreadFactoryImpl(processName + "_", isDaemon);
     }
 
     public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
         final boolean isDaemon) {
-        return new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
-                thread.setDaemon(isDaemon);
-                return thread;
-            }
-        };
+        return new ThreadFactoryImpl(String.format("%s_%d_", processName, threads), isDaemon);
     }
 
     /**
      * Create a new thread
      *
-     * @param name The name of the thread
+     * @param name     The name of the thread
      * @param runnable The work for the thread to do
-     * @param daemon Should the thread block JVM stop?
+     * @param daemon   Should the thread block JVM stop?
      * @return The unstarted thread
      */
     public static Thread newThread(String name, Runnable runnable, boolean daemon) {
@@ -121,7 +103,7 @@ public final class ThreadUtils {
      * Shutdown passed thread using isAlive and join.
      *
      * @param millis Pass 0 if we're to wait forever.
-     * @param t Thread to stop
+     * @param t      Thread to stop
      */
     public static void shutdownGracefully(final Thread t, final long millis) {
         if (t == null)
@@ -141,7 +123,7 @@ public final class ThreadUtils {
      * {@link ExecutorService}.
      *
      * @param executor executor
-     * @param timeout timeout
+     * @param timeout  timeout
      * @param timeUnit timeUnit
      */
     public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index ff9ef9ca3..5ac8d247d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -21,13 +21,13 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -40,17 +40,12 @@ public class PullConsumer {
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
         consumer.setNamesrvAddr("127.0.0.1:9876");
         Set<String> topics = new HashSet<>();
-        //You would better to register topics,It will use in rebalance when starting
+        //You would be better to register topics,It will use in rebalance when starting
         topics.add("TopicTest");
         consumer.setRegisterTopics(topics);
         consumer.start();
 
-        ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "PullConsumerThread");
-            }
-        });
+        ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactoryImpl("PullConsumerThread"));
         for (String topic : consumer.getRegisterTopics()) {
 
             executors.execute(new Runnable() {
@@ -137,7 +132,7 @@ public class PullConsumer {
 
                 public void incPullTPS(String topic, int pullSize) {
                     consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
-                            .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
+                        .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
                 }
             });
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 540e0b76d..69d8a275b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,6 +69,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -139,37 +139,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             publicThreadNums = 4;
         }
 
-        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
-            private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
-            }
-        });
+        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_"));
 
         this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(32), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
-                }
-            }
-        );
+            new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_"));
 
         if (eventLoopGroup != null) {
             this.eventLoopGroupWorker = eventLoopGroup;
         } else {
-            this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyClientSelector_"));
         }
         this.defaultEventExecutorGroup = eventExecutorGroup;
 
@@ -205,15 +183,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         if (this.defaultEventExecutorGroup == null) {
             this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                 nettyClientConfig.getClientWorkerThreads(),
-                new ThreadFactory() {
-
-                    private AtomicInteger threadIndex = new AtomicInteger(0);
-
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
-                    }
-                });
+                new ThreadFactoryImpl("NettyClientWorkerThread_"));
         }
         Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
index ee6f3f6c2..eb623a9de 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
@@ -21,9 +21,8 @@ import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,14 +44,7 @@ public class RemotingCodeDistributionHandlerTest {
         int count = 1000 * 1000;
         CountDownLatch latch = new CountDownLatch(threadCount);
         AtomicBoolean result = new AtomicBoolean(true);
-        ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactory() {
-            private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "RemotingCodeTest_" + this.threadIndex.incrementAndGet());
-            }
-        });
+        ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactoryImpl("RemotingCodeTest_"));
 
         for (int i = 0; i < threadCount; i++) {
             executorService.submit(() -> {