You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/08 00:41:04 UTC

[incubator-eventmesh] branch master updated: [ISSUE #3028] Refactor ThreadFactory implement (#3044)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new b305df018 [ISSUE #3028] Refactor ThreadFactory implement (#3044)
b305df018 is described below

commit b305df0188ef7ea2e22c7a996dae766205908186
Author: mxsm <lj...@gmail.com>
AuthorDate: Wed Feb 8 08:40:59 2023 +0800

    [ISSUE #3028] Refactor ThreadFactory implement (#3044)
    
    LGTM
---
 .../eventmesh/common/EventMeshThreadFactory.java   |  81 ++++++++++++++++
 .../apache/eventmesh/common/ThreadPoolFactory.java |  47 ++--------
 .../common/EventMeshThreadFactoryTest.java         |  14 ++-
 .../rabbitmq/consumer/RabbitmqConsumer.java        |   2 +-
 .../ConsumeMessageConcurrentlyService.java         |   8 +-
 .../pub/eventmeshmessage/AsyncPublishInstance.java |   1 -
 .../pub/eventmeshmessage/AsyncPublishInstance.java |   7 --
 .../eventmesh/runtime/boot/AbstractHTTPServer.java |   2 +-
 .../runtime/boot/AbstractRemotingServer.java       |  34 +------
 .../runtime/boot/EventMeshGrpcServer.java          |   8 +-
 .../runtime/boot/EventMeshHTTPServer.java          |  28 +++---
 .../eventmesh/runtime/boot/EventMeshTCPServer.java |   8 +-
 .../core/protocol/grpc/retry/GrpcRetryer.java      |  22 ++---
 .../protocol/http/consumer/EventMeshConsumer.java  |  88 +++++++++---------
 .../core/protocol/http/retry/HttpRetryer.java      |  25 ++---
 .../rebalance/EventMeshRebalanceService.java       |   6 +-
 .../client/session/retry/EventMeshTcpRetryer.java  |   4 +-
 .../runtime/metrics/grpc/EventMeshGrpcMonitor.java |   5 +-
 .../runtime/metrics/http/HTTPMetricsServer.java    | 102 +++++++++------------
 .../runtime/util/EventMeshThreadFactoryImpl.java   |  49 ----------
 .../eventmesh/runtime/util/EventMeshUtil.java      |   3 +-
 .../eventmesh/runtime/client/common/TCPClient.java |  38 ++------
 .../eventmesh/runtime/util/EventMeshUtilTest.java  |   3 +-
 .../grpc/consumer/EventMeshGrpcConsumer.java       |   9 +-
 .../http/consumer/EventMeshHttpConsumer.java       |  38 ++++----
 .../eventmesh/client/tcp/common/TcpClient.java     |  23 ++---
 26 files changed, 285 insertions(+), 370 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java
new file mode 100644
index 000000000..f4c9b3ff4
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import lombok.Getter;
+
+public class EventMeshThreadFactory implements ThreadFactory {
+
+    @Getter
+    private final String threadNamePrefix;
+    private final AtomicInteger threadIndex;
+    private final boolean daemon;
+    private Integer priority;
+
+    public EventMeshThreadFactory(final String threadNamePrefix, final AtomicInteger threadIndex, final boolean daemon,
+        final Integer priority) {
+        this.threadNamePrefix = threadNamePrefix;
+        this.threadIndex = threadIndex;
+        this.daemon = daemon;
+        this.priority = priority;
+    }
+
+    public EventMeshThreadFactory(final String threadNamePrefix, final AtomicInteger threadIndex,
+        final boolean daemon) {
+        this(threadNamePrefix, threadIndex, daemon, null);
+    }
+
+    public EventMeshThreadFactory(final String threadNamePrefix, final boolean daemon, final Integer priority) {
+        this(threadNamePrefix, new AtomicInteger(0), daemon, priority);
+    }
+
+    public EventMeshThreadFactory(final String threadNamePrefix, final boolean daemon) {
+        this(threadNamePrefix, new AtomicInteger(0), daemon);
+    }
+
+    public EventMeshThreadFactory(final String threadNamePrefix) {
+        this(threadNamePrefix, new AtomicInteger(0), false);
+    }
+
+    /**
+     * Constructs a new {@code Thread}.  Implementations may also initialize
+     * priority, name, daemon status, {@code ThreadGroup}, etc.
+     *
+     * @param runnable a runnable to be executed by new thread instance
+     * @return constructed thread, or {@code null} if the request to
+     * create a thread is rejected
+     */
+    @Override
+    public Thread newThread(final Runnable runnable) {
+
+        StringBuilder threadName = new StringBuilder(threadNamePrefix);
+        if (null != threadIndex) {
+            threadName.append("-").append(threadIndex.incrementAndGet());
+        }
+        Thread thread = new Thread(runnable, threadName.toString());
+        thread.setDaemon(daemon);
+        if (null != priority) {
+            thread.setPriority(priority);
+        }
+
+        return thread;
+    }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java
index 8b6e4379d..551d92c9a 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java
@@ -24,67 +24,36 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+public abstract class ThreadPoolFactory {
 
-public class ThreadPoolFactory {
+    private ThreadPoolFactory() {
+    }
 
     public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName) {
         return createThreadPoolExecutor(core, max, threadName, true);
     }
 
     public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName,
-                                                              final boolean isDaemon) {
+        final boolean isDaemon) {
         return createThreadPoolExecutor(core, max, new LinkedBlockingQueue<>(1000), threadName, isDaemon);
     }
 
     public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue,
-                                                              final String threadName, final boolean isDaemon) {
+        final String threadName, final boolean isDaemon) {
         return new ThreadPoolExecutor(core, max, 10 * 1000, TimeUnit.MILLISECONDS, blockingQueue,
-                new ThreadFactoryBuilder().setNameFormat(threadName).setDaemon(isDaemon).build()
-        );
-    }
-
-    public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, ThreadFactory threadFactory) {
-        return createThreadPoolExecutor(core, max, new LinkedBlockingQueue<>(1000), threadFactory);
+            new EventMeshThreadFactory(threadName, isDaemon));
     }
 
     public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue,
-                                                              ThreadFactory threadFactory) {
+        ThreadFactory threadFactory) {
         return new ThreadPoolExecutor(core, max, 10 * 1000, TimeUnit.MILLISECONDS, blockingQueue, threadFactory);
     }
 
     public static ScheduledExecutorService createSingleScheduledExecutor(final String threadName) {
-        return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-            private AtomicInteger ai = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, threadName + ai.incrementAndGet());
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+        return Executors.newSingleThreadScheduledExecutor(new EventMeshThreadFactory(threadName, true));
     }
 
-    public static ScheduledExecutorService createScheduledExecutor(int core, final String threadName) {
-        return createScheduledExecutor(core, threadName, true);
-    }
-
-    public static ScheduledExecutorService createScheduledExecutor(int core, final String threadName,
-                                                                   final boolean isDaemon) {
-        return Executors.newScheduledThreadPool(core, new ThreadFactory() {
-            private AtomicInteger ai = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, threadName + ai.incrementAndGet());
-                thread.setDaemon(isDaemon);
-                return thread;
-            }
-        });
-    }
 
     public static ScheduledExecutorService createScheduledExecutor(int core, ThreadFactory threadFactory) {
         return Executors.newScheduledThreadPool(core, threadFactory);
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImplTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/EventMeshThreadFactoryTest.java
similarity index 73%
rename from eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImplTest.java
rename to eventmesh-common/src/test/java/org/apache/eventmesh/common/EventMeshThreadFactoryTest.java
index f9b482185..944493260 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImplTest.java
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/EventMeshThreadFactoryTest.java
@@ -15,25 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.common;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-public class EventMeshThreadFactoryImplTest {
+public class EventMeshThreadFactoryTest {
 
     @Test
     public void testGetThreadNamePrefix() {
         final String threadNamePrefix = "threadNamePrefix";
-        EventMeshThreadFactoryImpl factory = new EventMeshThreadFactoryImpl(threadNamePrefix, false);
+        EventMeshThreadFactory factory = new EventMeshThreadFactory(threadNamePrefix, false);
         Assert.assertEquals(threadNamePrefix, factory.getThreadNamePrefix());
     }
 
     @Test
     public void testNewThread() {
         final String threadNamePrefix = "threadNamePrefix";
-        EventMeshThreadFactoryImpl factory = new EventMeshThreadFactoryImpl(threadNamePrefix, true);
-        Thread t = factory.newThread(() -> {});
+        EventMeshThreadFactory factory = new EventMeshThreadFactory(threadNamePrefix, true);
+        Thread t = factory.newThread(() -> {
+
+        });
         Assert.assertNotNull(t);
+        Assert.assertEquals(threadNamePrefix + "-1", t.getName());
+        Assert.assertTrue(t.isDaemon());
     }
 }
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java
index b2da7db84..6e1a77d60 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java
@@ -61,7 +61,7 @@ public class RabbitmqConsumer implements Consumer {
     private final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors() * 2,
             Runtime.getRuntime().availableProcessors() * 2,
-            "EventMesh-Rabbitmq-Consumer-");
+            "EventMesh-Rabbitmq-Consumer");
 
     private RabbitmqConsumerHandler rabbitmqConsumerHandler;
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 9d5f408c2..5cfe1bc73 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.impl.consumer;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -28,7 +29,6 @@ import org.apache.rocketmq.client.hook.ConsumeMessageContext;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -81,10 +81,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.consumeRequestQueue,
-            new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroup + "_"));
+            new EventMeshThreadFactory("ConsumeMessageThread_" + consumerGroup + "_"));
 
-        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
-        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new EventMeshThreadFactory("ConsumeMessageScheduledThread_"));
+        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new EventMeshThreadFactory("CleanExpireMsgScheduledThread_"));
 
         log.info("new ConsumeMessageConcurrentlyService instance for eventMesh has been created ");
     }
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
index 68a9ce7d2..85a7e4e72 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
@@ -18,7 +18,6 @@
 package org.apache.eventmesh.grpc.pub.eventmeshmessage;
 
 import org.apache.eventmesh.client.grpc.producer.EventMeshGrpcProducer;
-import org.apache.eventmesh.common.EventMeshMessage;
 import org.apache.eventmesh.common.ExampleConstants;
 import org.apache.eventmesh.grpc.GrpcAbstractDemo;
 
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
index 64c4919b6..5b179b1b9 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
@@ -17,23 +17,16 @@
 
 package org.apache.eventmesh.http.demo.pub.eventmeshmessage;
 
-import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
 import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.EventMeshMessage;
 import org.apache.eventmesh.common.ExampleConstants;
-import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.common.utils.RandomStringUtils;
-import org.apache.eventmesh.common.utils.ThreadUtils;
 import org.apache.eventmesh.http.demo.HttpAbstractDemo;
-import org.apache.eventmesh.util.Utils;
-
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 9f00023fc..f1b23d5f4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -121,7 +121,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
     private final transient EventMeshHTTPConfiguration eventMeshHttpConfiguration;
 
     private final transient ThreadPoolExecutor asyncContextCompleteHandler =
-            ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
+            ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext");
 
     private static final int MAX_CONNECTIONS = 20_000;
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index 71742cfa9..603990025 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -17,11 +17,9 @@
 
 package org.apache.eventmesh.runtime.boot;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.utils.ThreadUtils;
 
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,41 +74,17 @@ public abstract class AbstractRemotingServer {
     }
 
     private EventLoopGroup initBossGroup(final String threadPrefix) {
-        bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
-            private final AtomicInteger count = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread t = new Thread(r, threadPrefix + "-boss-" + count.incrementAndGet());
-                t.setDaemon(true);
-                return t;
-            }
-        });
-
+        bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "-boss", true));
         return bossGroup;
     }
 
     private EventLoopGroup initIOGroup(final String threadPrefix, final int threadNum) {
-        ioGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
-            private final AtomicInteger count = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(final Runnable r) {
-                return new Thread(r, threadPrefix + "-io-" + count.incrementAndGet());
-            }
-        });
+        ioGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-io"));
         return ioGroup;
     }
 
     private EventLoopGroup initWorkerGroup(final String threadPrefix, final int threadNum) {
-        workerGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
-            private final AtomicInteger count = new AtomicInteger(0);
-
-            @Override
-            public Thread newThread(final Runnable r) {
-                return new Thread(r, threadPrefix + "-worker-" + count.incrementAndGet());
-            }
-        });
+        workerGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-worker"));
         return workerGroup;
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
index 089e66b17..7aa4439a3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
@@ -245,7 +245,7 @@ public class EventMeshGrpcServer {
         sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshGrpcConfiguration.getEventMeshServerSendMsgThreadNum(),
                 eventMeshGrpcConfiguration.getEventMeshServerSendMsgThreadNum(), sendMsgThreadPoolQueue,
-                "eventMesh-grpc-sendMsg-%d", true);
+                "eventMesh-grpc-sendMsg", true);
 
         BlockingQueue<Runnable> subscribeMsgThreadPoolQueue =
             new LinkedBlockingQueue<Runnable>(eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgBlockQueueSize());
@@ -253,7 +253,7 @@ public class EventMeshGrpcServer {
         clientMgmtExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgThreadNum(),
                 eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgThreadNum(), subscribeMsgThreadPoolQueue,
-                "eventMesh-grpc-clientMgmt-%d", true);
+                "eventMesh-grpc-clientMgmt", true);
 
         BlockingQueue<Runnable> pushMsgThreadPoolQueue =
             new LinkedBlockingQueue<Runnable>(eventMeshGrpcConfiguration.getEventMeshServerPushMsgBlockQueueSize());
@@ -261,12 +261,12 @@ public class EventMeshGrpcServer {
         pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshGrpcConfiguration.getEventMeshServerPushMsgThreadNum(),
                 eventMeshGrpcConfiguration.getEventMeshServerPushMsgThreadNum(), pushMsgThreadPoolQueue,
-                "eventMesh-grpc-pushMsg-%d", true);
+                "eventMesh-grpc-pushMsg", true);
 
         replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshGrpcConfiguration.getEventMeshServerReplyMsgThreadNum(),
                 eventMeshGrpcConfiguration.getEventMeshServerReplyMsgThreadNum(), sendMsgThreadPoolQueue,
-                "eventMesh-grpc-replyMsg-%d", true);
+                "eventMesh-grpc-replyMsg", true);
     }
 
     private void initHttpClientPool() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 77fd38771..5a08d6334 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -142,44 +142,44 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
         batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(),
-                new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerBatchBlockQSize()),
-                "eventMesh-batchMsg-", true);
+                new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerBatchBlockQSize()),
+                "eventMesh-batchMsg", true);
 
         sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(),
-                new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerSendMsgBlockQSize()),
-                "eventMesh-sendMsg-", true);
+                new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerSendMsgBlockQSize()),
+                "eventMesh-sendMsg", true);
 
         remoteMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(),
-                new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerRemoteMsgBlockQSize()),
-                "eventMesh-remoteMsg-", true);
+                new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerRemoteMsgBlockQSize()),
+                "eventMesh-remoteMsg", true);
 
         pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(),
-                new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerPushMsgBlockQSize()),
-                "eventMesh-pushMsg-", true);
+                new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerPushMsgBlockQSize()),
+                "eventMesh-pushMsg", true);
 
         clientManageExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(),
-                new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()),
-                "eventMesh-clientManage-", true);
+                new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()),
+                "eventMesh-clientManage", true);
 
         adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(),
-                new LinkedBlockingQueue<Runnable>(50), "eventMesh-admin-",
+                new LinkedBlockingQueue<>(50), "eventMesh-admin",
                 true);
 
         replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(),
-                new LinkedBlockingQueue<Runnable>(100),
-                "eventMesh-replyMsg-", true);
+                new LinkedBlockingQueue<>(100),
+                "eventMesh-replyMsg", true);
     }
 
     public ThreadPoolExecutor getBatchMsgExecutor() {
@@ -397,7 +397,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
         webhookExecutor = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(),
                 eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(),
-                new LinkedBlockingQueue<Runnable>(100), "eventMesh-webhook-", true);
+                new LinkedBlockingQueue<>(100), "eventMesh-webhook", true);
         final WebHookProcessor webHookProcessor = new WebHookProcessor();
 
         final WebHookController webHookController = new WebHookController();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index 1bdce7b14..f4f89ada1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.runtime.boot;
 
 import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
 import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
@@ -37,7 +38,6 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.Eventmesh
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 import org.apache.eventmesh.runtime.registry.Registry;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
 import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManager;
 
 import java.util.List;
@@ -334,19 +334,19 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         super.init("eventMesh-tcp");
 
         scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler,
-                new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));
+                new EventMeshThreadFactory("eventMesh-tcp-scheduler", true));
 
         taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
                 eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
                 new LinkedBlockingQueue<>(10_000),
-                new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
+                new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));
 
         broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
                 eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
                 eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
                 new LinkedBlockingQueue<>(10_000),
-                new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
+                new EventMeshThreadFactory("eventMesh-tcp-msg-downstream", true));
     }
 
     private void shutdownThreadPool() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
index 0022e24ff..2ad6cce02 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
@@ -17,15 +17,14 @@
 
 package org.apache.eventmesh.runtime.core.protocol.grpc.retry;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,20 +57,11 @@ public class GrpcRetryer {
 
     public void init() {
         pool = new ThreadPoolExecutor(
-                grpcConfiguration.getEventMeshServerRetryThreadNum(),
-                grpcConfiguration.getEventMeshServerRetryThreadNum(), 60000, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<>(grpcConfiguration.getEventMeshServerRetryBlockQueueSize()),
-                new ThreadFactory() {
-                    private AtomicInteger count = new AtomicInteger();
-
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        Thread thread = new Thread(r, "grpc-retry-" + count.incrementAndGet());
-                        thread.setPriority(Thread.NORM_PRIORITY);
-                        thread.setDaemon(true);
-                        return thread;
-                    }
-                }, new ThreadPoolExecutor.AbortPolicy());
+            grpcConfiguration.getEventMeshServerRetryThreadNum(),
+            grpcConfiguration.getEventMeshServerRetryThreadNum(), 60000, TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(grpcConfiguration.getEventMeshServerRetryBlockQueueSize()),
+            new EventMeshThreadFactory("grpc-retry", true, Thread.NORM_PRIORITY),
+            new ThreadPoolExecutor.AbortPolicy());
 
         dispatcher = new Thread(() -> {
             try {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 4e060e3a4..ceafe19d2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -85,9 +85,9 @@ public class EventMeshConsumer {
         this.eventMeshHTTPServer = eventMeshHTTPServer;
         this.consumerGroupConf = consumerGroupConf;
         this.persistentMqConsumer = new MQConsumerWrapper(
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
+            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
         this.broadcastMqConsumer = new MQConsumerWrapper(
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
+            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
     }
 
     private MessageHandler httpMessageHandler;
@@ -99,26 +99,26 @@ public class EventMeshConsumer {
         keyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
         keyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
         keyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
+            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
         persistentMqConsumer.init(keyValue);
 
         EventListener clusterEventListener = (event, context) -> {
             String protocolVersion =
-                    Objects.requireNonNull(event.getSpecVersion()).toString();
+                Objects.requireNonNull(event.getSpecVersion()).toString();
 
             Span span = TraceUtils.prepareServerSpan(
-                    EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
-                    EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+                EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+                EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
             try {
                 String topic = event.getSubject();
                 String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO)).toString();
                 String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID)).toString();
 
                 event = CloudEventBuilder.from(event)
-                        .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
-                        .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
-                        .build();
+                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
+                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+                    .build();
                 if (messageLogger.isDebugEnabled()) {
                     messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
                 } else {
@@ -126,12 +126,12 @@ public class EventMeshConsumer {
                 }
 
                 ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
-                        topic, null);
+                    topic, null);
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
 
                 if (currentTopicConfig == null) {
                     logger.error("no topicConfig found, consumerGroup:{} topic:{}",
-                            consumerGroupConf.getConsumerGroup(), topic);
+                        consumerGroupConf.getConsumerGroup(), topic);
                     try {
                         sendMessageBack(event, uniqueId, bizSeqNo);
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -142,13 +142,13 @@ public class EventMeshConsumer {
                 }
 
                 SubscriptionItem subscriptionItem =
-                        consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+                    consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
                 HandleMsgContext handleMsgContext = new HandleMsgContext(
-                        EventMeshUtil.buildPushMsgSeqNo(),
-                        consumerGroupConf.getConsumerGroup(),
-                        EventMeshConsumer.this,
-                        topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
-                        consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+                    EventMeshUtil.buildPushMsgSeqNo(),
+                    consumerGroupConf.getConsumerGroup(),
+                    EventMeshConsumer.this,
+                    topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+                    consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                 if (httpMessageHandler.handle(handleMsgContext)) {
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -172,25 +172,25 @@ public class EventMeshConsumer {
         broadcastKeyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
         broadcastKeyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
         broadcastKeyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
+            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
         broadcastMqConsumer.init(broadcastKeyValue);
 
         EventListener broadcastEventListener = (event, context) -> {
 
             String protocolVersion =
-                    Objects.requireNonNull(event.getSpecVersion()).toString();
+                Objects.requireNonNull(event.getSpecVersion()).toString();
 
             Span span = TraceUtils.prepareServerSpan(
-                    EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
-                    EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+                EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+                EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
             try {
 
                 event = CloudEventBuilder.from(event)
-                        .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                                String.valueOf(System.currentTimeMillis()))
-                        .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
-                        .build();
+                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()))
+                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+                    .build();
 
                 String topic = event.getSubject();
                 String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO, "");
@@ -200,18 +200,18 @@ public class EventMeshConsumer {
                     messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
                 } else {
                     messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}",
-                            topic, bizSeqNo,
-                            uniqueId);
+                        topic, bizSeqNo,
+                        uniqueId);
                 }
 
                 ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
-                        consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+                    consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
-                        (EventMeshAsyncConsumeContext) context;
+                    (EventMeshAsyncConsumeContext) context;
 
                 if (currentTopicConfig == null) {
                     logger.error("no topicConfig found, consumerGroup:{} topic:{}",
-                            consumerGroupConf.getConsumerGroup(), topic);
+                        consumerGroupConf.getConsumerGroup(), topic);
                     try {
                         sendMessageBack(event, uniqueId, bizSeqNo);
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -222,15 +222,15 @@ public class EventMeshConsumer {
                 }
 
                 SubscriptionItem subscriptionItem =
-                        consumerGroupConf.getConsumerGroupTopicConf().get(topic)
-                                .getSubscriptionItem();
+                    consumerGroupConf.getConsumerGroupTopicConf().get(topic)
+                        .getSubscriptionItem();
                 HandleMsgContext handleMsgContext =
-                        new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
-                                consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                                topic, event, subscriptionItem,
-                                eventMeshAsyncConsumeContext.getAbstractContext(),
-                                consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
-                                currentTopicConfig);
+                    new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+                        consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+                        topic, event, subscriptionItem,
+                        eventMeshAsyncConsumeContext.getAbstractContext(),
+                        consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
+                        currentTopicConfig);
 
                 if (httpMessageHandler.handle(handleMsgContext)) {
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -312,16 +312,16 @@ public class EventMeshConsumer {
     public void sendMessageBack(final CloudEvent event, final String uniqueId, String bizSeqNo) throws Exception {
 
         EventMeshProducer sendMessageBack
-                = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
+            = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
 
         if (sendMessageBack == null) {
             logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}",
-                    consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+                consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
             return;
         }
 
         final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event, sendMessageBack,
-                eventMeshHTTPServer);
+            eventMeshHTTPServer);
 
         sendMessageBack.send(sendMessageBackContext, new SendCallback() {
             @Override
@@ -331,7 +331,7 @@ public class EventMeshConsumer {
             @Override
             public void onException(OnExceptionContext context) {
                 logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
-                        consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+                    consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
             }
         });
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
index b7faae46a..f87ab0501 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
@@ -17,14 +17,13 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.retry;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,22 +56,12 @@ public class HttpRetryer {
 
     public void init() {
         pool = new ThreadPoolExecutor(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
-                60000,
-                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
-                    eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()),
-                new ThreadFactory() {
-                    private AtomicInteger count = new AtomicInteger();
-
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        Thread thread = new Thread(r, "http-retry-" + count.incrementAndGet());
-                        thread.setPriority(Thread.NORM_PRIORITY);
-                        thread.setDaemon(true);
-                        return thread;
-                    }
-                },
-                new ThreadPoolExecutor.AbortPolicy());
+            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
+            60000,
+            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
+            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()),
+            new EventMeshThreadFactory("http-retry", true, Thread.NORM_PRIORITY),
+            new ThreadPoolExecutor.AbortPolicy());
 
         dispatcher = new Thread(() -> {
             try {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
index 560cf40e2..48406c38a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
@@ -17,9 +17,9 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import java.util.concurrent.ScheduledExecutorService;
@@ -47,8 +47,8 @@ public class EventMeshRebalanceService {
     }
 
     public void init() {
-        this.serviceRebalanceScheduler = ThreadPoolFactory.createScheduledExecutor(5, new EventMeshThreadFactoryImpl("proxy-rebalance-sch", true));
-        logger.info("rebalance service inited......");
+        this.serviceRebalanceScheduler = ThreadPoolFactory.createScheduledExecutor(5, new EventMeshThreadFactory("proxy-rebalance-sch", true));
+        logger.info("rebalance service inited ......");
     }
 
     public void start() throws Exception {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
index 6108d8961..8f6d2e8ad 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
@@ -17,10 +17,10 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import java.util.concurrent.ArrayBlockingQueue;
@@ -43,7 +43,7 @@ public class EventMeshTcpRetryer {
         3,
         60000,
         TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
-        new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true),
+        new EventMeshThreadFactory("eventMesh-tcp-retry", true),
         new ThreadPoolExecutor.AbortPolicy());
 
     private Thread dispatcher;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
index 5d06abf3e..fe9fca4bc 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
@@ -17,11 +17,12 @@
 
 package org.apache.eventmesh.runtime.metrics.grpc;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.metrics.api.MetricsRegistry;
 import org.apache.eventmesh.metrics.api.model.GrpcSummaryMetrics;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
+
 
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
@@ -49,7 +50,7 @@ public class EventMeshGrpcMonitor {
         this.metricsRegistries = Preconditions.checkNotNull(metricsRegistries);
         this.grpcSummaryMetrics = new GrpcSummaryMetrics();
         this.scheduler = ThreadPoolFactory.createScheduledExecutor(SCHEDULE_THREAD_SIZE,
-            new EventMeshThreadFactoryImpl(THREAD_NAME_PREFIX, true));
+            new EventMeshThreadFactory(THREAD_NAME_PREFIX, true));
     }
 
     public void init() throws Exception {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index 7ceecc08e..2b3707ead 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.metrics.http;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.metrics.api.MetricsRegistry;
 import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
@@ -25,9 +26,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,17 +43,17 @@ public class HTTPMetricsServer {
     private final transient HttpSummaryMetrics summaryMetrics;
 
     public HTTPMetricsServer(final EventMeshHTTPServer eventMeshHTTPServer,
-                             final List<MetricsRegistry> metricsRegistries) {
+        final List<MetricsRegistry> metricsRegistries) {
         Objects.requireNonNull(eventMeshHTTPServer, "EventMeshHTTPServer can not be null");
         Objects.requireNonNull(metricsRegistries, "List<MetricsRegistry> can not be null");
 
         this.eventMeshHTTPServer = eventMeshHTTPServer;
         this.metricsRegistries = metricsRegistries;
         this.summaryMetrics = new HttpSummaryMetrics(
-                eventMeshHTTPServer.batchMsgExecutor,
-                eventMeshHTTPServer.sendMsgExecutor,
-                eventMeshHTTPServer.pushMsgExecutor,
-                eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
+            eventMeshHTTPServer.batchMsgExecutor,
+            eventMeshHTTPServer.sendMsgExecutor,
+            eventMeshHTTPServer.pushMsgExecutor,
+            eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
 
         init();
     }
@@ -105,96 +105,82 @@ public class HTTPMetricsServer {
         }
     }
 
-    private static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2, new ThreadFactory() {
-        private final transient AtomicInteger seq = new AtomicInteger(0);
-
-        @Override
-        public Thread newThread(final Runnable r) {
-            seq.incrementAndGet();
-            final Thread t = new Thread(r, "eventMesh-metrics-" + seq.get());
-            t.setDaemon(true);
-            return t;
-        }
-    });
+    private static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2,
+        new EventMeshThreadFactory("eventMesh-metrics", true));
 
     // todo: move this into standalone metrics plugin
 
     private void logPrintServerMetrics(final HttpSummaryMetrics summaryMetrics,
-                                       final EventMeshHTTPServer eventMeshHTTPServer) {
+        final EventMeshHTTPServer eventMeshHTTPServer) {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("===========================================SERVER METRICS==================================================");
 
             LOGGER.info("maxHTTPTPS: {}, avgHTTPTPS: {}, maxHTTPCOST: {}, avgHTTPCOST: {}, avgHTTPBodyDecodeCost: {}, httpDiscard: {}",
-                    summaryMetrics.maxHTTPTPS(),
-                    summaryMetrics.avgHTTPTPS(),
-                    summaryMetrics.maxHTTPCost(),
-                    summaryMetrics.avgHTTPCost(),
-                    summaryMetrics.avgHTTPBodyDecodeCost(),
-                    summaryMetrics.getHttpDiscard());
+                summaryMetrics.maxHTTPTPS(),
+                summaryMetrics.avgHTTPTPS(),
+                summaryMetrics.maxHTTPCost(),
+                summaryMetrics.avgHTTPCost(),
+                summaryMetrics.avgHTTPBodyDecodeCost(),
+                summaryMetrics.getHttpDiscard());
         }
 
         summaryMetrics.httpStatInfoClear();
 
-
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("maxBatchSendMsgTPS: {}, avgBatchSendMsgTPS: {}, sum: {}. sumFail: {}, sumFailRate: {}, discard : {}",
-                    summaryMetrics.maxSendBatchMsgTPS(),
-                    summaryMetrics.avgSendBatchMsgTPS(),
-                    summaryMetrics.getSendBatchMsgNumSum(),
-                    summaryMetrics.getSendBatchMsgFailNumSum(),
-                    summaryMetrics.getSendBatchMsgFailRate(),
-                    summaryMetrics.getSendBatchMsgDiscardNumSum()
+                summaryMetrics.maxSendBatchMsgTPS(),
+                summaryMetrics.avgSendBatchMsgTPS(),
+                summaryMetrics.getSendBatchMsgNumSum(),
+                summaryMetrics.getSendBatchMsgFailNumSum(),
+                summaryMetrics.getSendBatchMsgFailRate(),
+                summaryMetrics.getSendBatchMsgDiscardNumSum()
             );
         }
 
         summaryMetrics.cleanSendBatchStat();
 
-
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("maxSendMsgTPS: {}, avgSendMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, replyMsg: {}, replyFail: {}",
-                    summaryMetrics.maxSendMsgTPS(),
-                    summaryMetrics.avgSendMsgTPS(),
-                    summaryMetrics.getSendMsgNumSum(),
-                    summaryMetrics.getSendMsgFailNumSum(),
-                    summaryMetrics.getSendMsgFailRate(),
-                    summaryMetrics.getReplyMsgNumSum(),
-                    summaryMetrics.getReplyMsgFailNumSum()
+                summaryMetrics.maxSendMsgTPS(),
+                summaryMetrics.avgSendMsgTPS(),
+                summaryMetrics.getSendMsgNumSum(),
+                summaryMetrics.getSendMsgFailNumSum(),
+                summaryMetrics.getSendMsgFailRate(),
+                summaryMetrics.getReplyMsgNumSum(),
+                summaryMetrics.getReplyMsgFailNumSum()
             );
         }
 
         summaryMetrics.cleanSendMsgStat();
 
-
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info(
-                    "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
-                    summaryMetrics.maxPushMsgTPS(),
-                    summaryMetrics.avgPushMsgTPS(),
-                    summaryMetrics.getHttpPushMsgNumSum(),
-                    summaryMetrics.getHttpPushFailNumSum(),
-                    summaryMetrics.getHttpPushMsgFailRate(),
-                    summaryMetrics.maxHTTPPushLatency(),
-                    summaryMetrics.avgHTTPPushLatency()
+                "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
+                summaryMetrics.maxPushMsgTPS(),
+                summaryMetrics.avgPushMsgTPS(),
+                summaryMetrics.getHttpPushMsgNumSum(),
+                summaryMetrics.getHttpPushFailNumSum(),
+                summaryMetrics.getHttpPushMsgFailRate(),
+                summaryMetrics.maxHTTPPushLatency(),
+                summaryMetrics.avgHTTPPushLatency()
             );
         }
 
         summaryMetrics.cleanHttpPushMsgStat();
 
-
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
-                    eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
-                    eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
-                    eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
-                    eventMeshHTTPServer.getHttpRetryer().size());
+                eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
+                eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
+                eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
+                eventMeshHTTPServer.getHttpRetryer().size());
         }
 
-
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("batchAvgSend2MQCost: {}, avgSend2MQCost: {}, avgReply2MQCost: {}",
-                    summaryMetrics.avgBatchSendMsgCost(),
-                    summaryMetrics.avgSendMsgCost(),
-                    summaryMetrics.avgReplyMsgCost());
+                summaryMetrics.avgBatchSendMsgCost(),
+                summaryMetrics.avgSendMsgCost(),
+                summaryMetrics.avgReplyMsgCost());
         }
         summaryMetrics.send2MQStatInfoClear();
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImpl.java
deleted file mode 100644
index cf8390411..000000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class EventMeshThreadFactoryImpl implements ThreadFactory {
-    private final AtomicLong threadIndex = new AtomicLong(0);
-    private final String threadNamePrefix;
-    private Boolean isDaemonSpecified = null;
-
-    public EventMeshThreadFactoryImpl(final String threadNamePrefix) {
-        this.threadNamePrefix = threadNamePrefix;
-    }
-
-    public EventMeshThreadFactoryImpl(final String threadNamePrefix, final boolean isDaemonSpecified) {
-        this.threadNamePrefix = threadNamePrefix;
-        this.isDaemonSpecified = isDaemonSpecified;
-    }
-
-    public String getThreadNamePrefix() {
-        return threadNamePrefix;
-    }
-
-    @Override
-    public Thread newThread(Runnable r) {
-        Thread t = new Thread(r, threadNamePrefix + '-' + this.threadIndex.incrementAndGet());
-        if (isDaemonSpecified != null) {
-            t.setDaemon(isDaemonSpecified);
-        }
-        return t;
-    }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 1303529aa..badb953b8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.util;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.utils.RandomStringUtils;
@@ -292,7 +293,7 @@ public class EventMeshUtil {
 
     public static void printState(final ThreadPoolExecutor scheduledExecutorService) {
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactoryImpl) scheduledExecutorService.getThreadFactory())
+            LOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactory) scheduledExecutorService.getThreadFactory())
                     .getThreadNamePrefix(), scheduledExecutorService.getQueue().size(), scheduledExecutorService
                     .getPoolSize(), scheduledExecutorService.getActiveCount(), scheduledExecutorService
                     .getCompletedTaskCount());
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java
index 0a14fee35..b660f6640 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.client.common;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 
 import java.io.Closeable;
@@ -25,10 +26,8 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +47,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
-
 /**
  * one Client connects one ACCESS
  * Provides the most basic connection, send capability, and cannot provide disconnected reconnection capability,
@@ -70,27 +68,9 @@ public abstract class TCPClient implements Closeable {
     private Bootstrap bootstrap = new Bootstrap();
 
     protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4,
-            new ThreadFactory() {
-                AtomicInteger count = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    Thread t = new Thread(r, "TCPClientScheduler-" + count.incrementAndGet());
-                    t.setDaemon(true);
-                    return t;
-                }
-            }
-    );
+        new EventMeshThreadFactory("TCPClientScheduler", true));
 
-    private NioEventLoopGroup workers = new NioEventLoopGroup(8, new ThreadFactory() {
-        AtomicInteger count = new AtomicInteger(0);
-
-        @Override
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(r, "TCPClientWorker-" + count.incrementAndGet());
-            return t;
-        }
-    });
+    private NioEventLoopGroup workers = new NioEventLoopGroup(8, new EventMeshThreadFactory("TCPClientWorker"));
 
     public Channel channel;
 
@@ -141,15 +121,15 @@ public abstract class TCPClient implements Closeable {
         bootstrap.group(workers);
         bootstrap.channel(NioSocketChannel.class);
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000)
-                .option(ChannelOption.SO_KEEPALIVE, false)
-                .option(ChannelOption.SO_SNDBUF, 64 * 1024)
-                .option(ChannelOption.SO_RCVBUF, 64 * 1024)
-                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
-                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            .option(ChannelOption.SO_KEEPALIVE, false)
+            .option(ChannelOption.SO_SNDBUF, 64 * 1024)
+            .option(ChannelOption.SO_RCVBUF, 64 * 1024)
+            .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
             public void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(new Codec.Encoder(), new Codec.Decoder())
-                        .addLast(handler, newExceptionHandler());
+                    .addLast(handler, newExceptionHandler());
             }
         });
 
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java
index 338b2708b..2c715b373 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.util;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
@@ -167,7 +168,7 @@ public class EventMeshUtilTest {
     public void testPrintState() {
         try {
             ScheduledExecutorService serviceRebalanceScheduler = ThreadPoolFactory
-                    .createScheduledExecutor(5, new EventMeshThreadFactoryImpl("proxy-rebalance-sch", true));
+                    .createScheduledExecutor(5, new EventMeshThreadFactory("proxy-rebalance-sch", true));
             EventMeshUtil.printState((ThreadPoolExecutor) serviceRebalanceScheduler);
         } catch (Exception e) {
             Assert.fail(e.getMessage());
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
index ce3d186aa..deba03614 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
@@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toList;
 import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
 import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil;
 import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
@@ -53,8 +54,6 @@ import java.util.stream.Collectors;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
@@ -68,8 +67,8 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
     private final transient Map<String, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap<>();
 
     private final transient ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
-            Runtime.getRuntime().availableProcessors(),
-            new ThreadFactoryBuilder().setNameFormat("GRPCClientScheduler").setDaemon(true).build());
+        Runtime.getRuntime().availableProcessors(),
+        new EventMeshThreadFactory("GRPCClientScheduler", true));
 
     private transient ConsumerServiceBlockingStub consumerClient;
     private transient ConsumerServiceStub consumerAsyncClient;
@@ -84,7 +83,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
 
     public void init() {
         channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort())
-                .usePlaintext().build();
+            .usePlaintext().build();
 
         consumerClient = ConsumerServiceGrpc.newBlockingStub(channel);
         consumerAsyncClient = ConsumerServiceGrpc.newStub(channel);
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
index 2e8440237..bb8ec1e77 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
@@ -24,6 +24,7 @@ import org.apache.eventmesh.client.http.model.RequestParam;
 import org.apache.eventmesh.client.http.util.HttpUtils;
 import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
@@ -49,8 +50,6 @@ import java.util.stream.Collectors;
 
 import io.netty.handler.codec.http.HttpMethod;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -71,14 +70,11 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo
             throws EventMeshException {
         super(eventMeshHttpClientConfig);
         this.consumeExecutor = Optional.ofNullable(customExecutor).orElseGet(
-                () -> ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(),
-                        eventMeshHttpClientConfig.getConsumeThreadMax(), "EventMesh-client-consume-")
-        );
-
-        this.scheduler = new ScheduledThreadPoolExecutor(
-                Runtime.getRuntime().availableProcessors(),
-                new ThreadFactoryBuilder().setNameFormat("HTTPClientScheduler").setDaemon(true).build()
+            () -> ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(),
+                eventMeshHttpClientConfig.getConsumeThreadMax(), "EventMesh-client-consume")
         );
+        this.scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+            new EventMeshThreadFactory("HTTPClientScheduler", true));
     }
 
     /**
@@ -190,17 +186,17 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo
 
     private RequestParam buildCommonRequestParam() {
         return new RequestParam(HttpMethod.POST)
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
-                // add protocol version?
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
-                .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshHttpClientConfig.getConsumerGroup());
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
+            // add protocol version?
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+            .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshHttpClientConfig.getConsumerGroup());
     }
 }
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
index fda7fbe5b..74b32c515 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
@@ -18,17 +18,20 @@
 package org.apache.eventmesh.client.tcp.common;
 
 import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.ThreadPoolFactory;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
 
+
 import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -49,7 +52,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -72,9 +74,8 @@ public abstract class TcpClient implements Closeable {
 
     private transient ScheduledFuture<?> heartTask;
 
-    protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
-            Runtime.getRuntime().availableProcessors(),
-            new ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build());
+    protected static final ScheduledExecutorService scheduler = ThreadPoolFactory.createScheduledExecutor(Runtime.getRuntime().availableProcessors(),
+        new EventMeshThreadFactory("TCPClientScheduler", true));
 
     public TcpClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) {
         Preconditions.checkNotNull(eventMeshTcpClientConfig, "EventMeshTcpClientConfig cannot be null");
@@ -89,16 +90,16 @@ public abstract class TcpClient implements Closeable {
         bootstrap.group(workers);
         bootstrap.channel(NioSocketChannel.class);
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000)
-                .option(ChannelOption.SO_KEEPALIVE, true)
-                .option(ChannelOption.SO_SNDBUF, 64 * 1024)
-                .option(ChannelOption.SO_RCVBUF, 64 * 1024)
-                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
-                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.SO_SNDBUF, 64 * 1024)
+            .option(ChannelOption.SO_RCVBUF, 64 * 1024)
+            .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) {
                 ch.pipeline().addLast(new Codec.Encoder(), new Codec.Decoder())
-                        .addLast(handler, newExceptionHandler());
+                    .addLast(handler, newExceptionHandler());
             }
         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org