You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/08 00:41:04 UTC
[incubator-eventmesh] branch master updated: [ISSUE #3028] Refactor ThreadFactory implement (#3044)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new b305df018 [ISSUE #3028] Refactor ThreadFactory implement (#3044)
b305df018 is described below
commit b305df0188ef7ea2e22c7a996dae766205908186
Author: mxsm <lj...@gmail.com>
AuthorDate: Wed Feb 8 08:40:59 2023 +0800
[ISSUE #3028] Refactor ThreadFactory implement (#3044)
LGTM
---
.../eventmesh/common/EventMeshThreadFactory.java | 81 ++++++++++++++++
.../apache/eventmesh/common/ThreadPoolFactory.java | 47 ++--------
.../common/EventMeshThreadFactoryTest.java | 14 ++-
.../rabbitmq/consumer/RabbitmqConsumer.java | 2 +-
.../ConsumeMessageConcurrentlyService.java | 8 +-
.../pub/eventmeshmessage/AsyncPublishInstance.java | 1 -
.../pub/eventmeshmessage/AsyncPublishInstance.java | 7 --
.../eventmesh/runtime/boot/AbstractHTTPServer.java | 2 +-
.../runtime/boot/AbstractRemotingServer.java | 34 +------
.../runtime/boot/EventMeshGrpcServer.java | 8 +-
.../runtime/boot/EventMeshHTTPServer.java | 28 +++---
.../eventmesh/runtime/boot/EventMeshTCPServer.java | 8 +-
.../core/protocol/grpc/retry/GrpcRetryer.java | 22 ++---
.../protocol/http/consumer/EventMeshConsumer.java | 88 +++++++++---------
.../core/protocol/http/retry/HttpRetryer.java | 25 ++---
.../rebalance/EventMeshRebalanceService.java | 6 +-
.../client/session/retry/EventMeshTcpRetryer.java | 4 +-
.../runtime/metrics/grpc/EventMeshGrpcMonitor.java | 5 +-
.../runtime/metrics/http/HTTPMetricsServer.java | 102 +++++++++------------
.../runtime/util/EventMeshThreadFactoryImpl.java | 49 ----------
.../eventmesh/runtime/util/EventMeshUtil.java | 3 +-
.../eventmesh/runtime/client/common/TCPClient.java | 38 ++------
.../eventmesh/runtime/util/EventMeshUtilTest.java | 3 +-
.../grpc/consumer/EventMeshGrpcConsumer.java | 9 +-
.../http/consumer/EventMeshHttpConsumer.java | 38 ++++----
.../eventmesh/client/tcp/common/TcpClient.java | 23 ++---
26 files changed, 285 insertions(+), 370 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java
new file mode 100644
index 000000000..f4c9b3ff4
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import lombok.Getter;
+
+public class EventMeshThreadFactory implements ThreadFactory {
+
+ @Getter
+ private final String threadNamePrefix;
+ private final AtomicInteger threadIndex;
+ private final boolean daemon;
+ private Integer priority;
+
+ public EventMeshThreadFactory(final String threadNamePrefix, final AtomicInteger threadIndex, final boolean daemon,
+ final Integer priority) {
+ this.threadNamePrefix = threadNamePrefix;
+ this.threadIndex = threadIndex;
+ this.daemon = daemon;
+ this.priority = priority;
+ }
+
+ public EventMeshThreadFactory(final String threadNamePrefix, final AtomicInteger threadIndex,
+ final boolean daemon) {
+ this(threadNamePrefix, threadIndex, daemon, null);
+ }
+
+ public EventMeshThreadFactory(final String threadNamePrefix, final boolean daemon, final Integer priority) {
+ this(threadNamePrefix, new AtomicInteger(0), daemon, priority);
+ }
+
+ public EventMeshThreadFactory(final String threadNamePrefix, final boolean daemon) {
+ this(threadNamePrefix, new AtomicInteger(0), daemon);
+ }
+
+ public EventMeshThreadFactory(final String threadNamePrefix) {
+ this(threadNamePrefix, new AtomicInteger(0), false);
+ }
+
+ /**
+ * Constructs a new {@code Thread}. Implementations may also initialize
+ * priority, name, daemon status, {@code ThreadGroup}, etc.
+ *
+ * @param runnable a runnable to be executed by new thread instance
+ * @return constructed thread, or {@code null} if the request to
+ * create a thread is rejected
+ */
+ @Override
+ public Thread newThread(final Runnable runnable) {
+
+ StringBuilder threadName = new StringBuilder(threadNamePrefix);
+ if (null != threadIndex) {
+ threadName.append("-").append(threadIndex.incrementAndGet());
+ }
+ Thread thread = new Thread(runnable, threadName.toString());
+ thread.setDaemon(daemon);
+ if (null != priority) {
+ thread.setPriority(priority);
+ }
+
+ return thread;
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java
index 8b6e4379d..551d92c9a 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadPoolFactory.java
@@ -24,67 +24,36 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+public abstract class ThreadPoolFactory {
-public class ThreadPoolFactory {
+ private ThreadPoolFactory() {
+ }
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName) {
return createThreadPoolExecutor(core, max, threadName, true);
}
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName,
- final boolean isDaemon) {
+ final boolean isDaemon) {
return createThreadPoolExecutor(core, max, new LinkedBlockingQueue<>(1000), threadName, isDaemon);
}
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue,
- final String threadName, final boolean isDaemon) {
+ final String threadName, final boolean isDaemon) {
return new ThreadPoolExecutor(core, max, 10 * 1000, TimeUnit.MILLISECONDS, blockingQueue,
- new ThreadFactoryBuilder().setNameFormat(threadName).setDaemon(isDaemon).build()
- );
- }
-
- public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, ThreadFactory threadFactory) {
- return createThreadPoolExecutor(core, max, new LinkedBlockingQueue<>(1000), threadFactory);
+ new EventMeshThreadFactory(threadName, isDaemon));
}
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue,
- ThreadFactory threadFactory) {
+ ThreadFactory threadFactory) {
return new ThreadPoolExecutor(core, max, 10 * 1000, TimeUnit.MILLISECONDS, blockingQueue, threadFactory);
}
public static ScheduledExecutorService createSingleScheduledExecutor(final String threadName) {
- return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- private AtomicInteger ai = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, threadName + ai.incrementAndGet());
- thread.setDaemon(true);
- return thread;
- }
- });
+ return Executors.newSingleThreadScheduledExecutor(new EventMeshThreadFactory(threadName, true));
}
- public static ScheduledExecutorService createScheduledExecutor(int core, final String threadName) {
- return createScheduledExecutor(core, threadName, true);
- }
-
- public static ScheduledExecutorService createScheduledExecutor(int core, final String threadName,
- final boolean isDaemon) {
- return Executors.newScheduledThreadPool(core, new ThreadFactory() {
- private AtomicInteger ai = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, threadName + ai.incrementAndGet());
- thread.setDaemon(isDaemon);
- return thread;
- }
- });
- }
public static ScheduledExecutorService createScheduledExecutor(int core, ThreadFactory threadFactory) {
return Executors.newScheduledThreadPool(core, threadFactory);
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImplTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/EventMeshThreadFactoryTest.java
similarity index 73%
rename from eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImplTest.java
rename to eventmesh-common/src/test/java/org/apache/eventmesh/common/EventMeshThreadFactoryTest.java
index f9b482185..944493260 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImplTest.java
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/EventMeshThreadFactoryTest.java
@@ -15,25 +15,29 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.util;
+package org.apache.eventmesh.common;
import org.junit.Assert;
import org.junit.Test;
-public class EventMeshThreadFactoryImplTest {
+public class EventMeshThreadFactoryTest {
@Test
public void testGetThreadNamePrefix() {
final String threadNamePrefix = "threadNamePrefix";
- EventMeshThreadFactoryImpl factory = new EventMeshThreadFactoryImpl(threadNamePrefix, false);
+ EventMeshThreadFactory factory = new EventMeshThreadFactory(threadNamePrefix, false);
Assert.assertEquals(threadNamePrefix, factory.getThreadNamePrefix());
}
@Test
public void testNewThread() {
final String threadNamePrefix = "threadNamePrefix";
- EventMeshThreadFactoryImpl factory = new EventMeshThreadFactoryImpl(threadNamePrefix, true);
- Thread t = factory.newThread(() -> {});
+ EventMeshThreadFactory factory = new EventMeshThreadFactory(threadNamePrefix, true);
+ Thread t = factory.newThread(() -> {
+
+ });
Assert.assertNotNull(t);
+ Assert.assertEquals(threadNamePrefix + "-1", t.getName());
+ Assert.assertTrue(t.isDaemon());
}
}
\ No newline at end of file
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java
index b2da7db84..6e1a77d60 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/consumer/RabbitmqConsumer.java
@@ -61,7 +61,7 @@ public class RabbitmqConsumer implements Consumer {
private final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 2,
- "EventMesh-Rabbitmq-Consumer-");
+ "EventMesh-Rabbitmq-Consumer");
private RabbitmqConsumerHandler rabbitmqConsumerHandler;
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 9d5f408c2..5cfe1bc73 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.impl.consumer;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -28,7 +29,6 @@ import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
@@ -81,10 +81,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
- new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroup + "_"));
+ new EventMeshThreadFactory("ConsumeMessageThread_" + consumerGroup + "_"));
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
- this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new EventMeshThreadFactory("ConsumeMessageScheduledThread_"));
+ this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new EventMeshThreadFactory("CleanExpireMsgScheduledThread_"));
log.info("new ConsumeMessageConcurrentlyService instance for eventMesh has been created ");
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
index 68a9ce7d2..85a7e4e72 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java
@@ -18,7 +18,6 @@
package org.apache.eventmesh.grpc.pub.eventmeshmessage;
import org.apache.eventmesh.client.grpc.producer.EventMeshGrpcProducer;
-import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.grpc.GrpcAbstractDemo;
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
index 64c4919b6..5b179b1b9 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/eventmeshmessage/AsyncPublishInstance.java
@@ -17,23 +17,16 @@
package org.apache.eventmesh.http.demo.pub.eventmeshmessage;
-import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.ExampleConstants;
-import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
-import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.http.demo.HttpAbstractDemo;
-import org.apache.eventmesh.util.Utils;
-
-import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 9f00023fc..f1b23d5f4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -121,7 +121,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
private final transient EventMeshHTTPConfiguration eventMeshHttpConfiguration;
private final transient ThreadPoolExecutor asyncContextCompleteHandler =
- ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
+ ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext");
private static final int MAX_CONNECTIONS = 20_000;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index 71742cfa9..603990025 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -17,11 +17,9 @@
package org.apache.eventmesh.runtime.boot;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.utils.ThreadUtils;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,41 +74,17 @@ public abstract class AbstractRemotingServer {
}
private EventLoopGroup initBossGroup(final String threadPrefix) {
- bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
- private final AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread t = new Thread(r, threadPrefix + "-boss-" + count.incrementAndGet());
- t.setDaemon(true);
- return t;
- }
- });
-
+ bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "-boss", true));
return bossGroup;
}
private EventLoopGroup initIOGroup(final String threadPrefix, final int threadNum) {
- ioGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
- private final AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(r, threadPrefix + "-io-" + count.incrementAndGet());
- }
- });
+ ioGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-io"));
return ioGroup;
}
private EventLoopGroup initWorkerGroup(final String threadPrefix, final int threadNum) {
- workerGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
- private final AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(r, threadPrefix + "-worker-" + count.incrementAndGet());
- }
- });
+ workerGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-worker"));
return workerGroup;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
index 089e66b17..7aa4439a3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
@@ -245,7 +245,7 @@ public class EventMeshGrpcServer {
sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshGrpcConfiguration.getEventMeshServerSendMsgThreadNum(),
eventMeshGrpcConfiguration.getEventMeshServerSendMsgThreadNum(), sendMsgThreadPoolQueue,
- "eventMesh-grpc-sendMsg-%d", true);
+ "eventMesh-grpc-sendMsg", true);
BlockingQueue<Runnable> subscribeMsgThreadPoolQueue =
new LinkedBlockingQueue<Runnable>(eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgBlockQueueSize());
@@ -253,7 +253,7 @@ public class EventMeshGrpcServer {
clientMgmtExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgThreadNum(),
eventMeshGrpcConfiguration.getEventMeshServerSubscribeMsgThreadNum(), subscribeMsgThreadPoolQueue,
- "eventMesh-grpc-clientMgmt-%d", true);
+ "eventMesh-grpc-clientMgmt", true);
BlockingQueue<Runnable> pushMsgThreadPoolQueue =
new LinkedBlockingQueue<Runnable>(eventMeshGrpcConfiguration.getEventMeshServerPushMsgBlockQueueSize());
@@ -261,12 +261,12 @@ public class EventMeshGrpcServer {
pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshGrpcConfiguration.getEventMeshServerPushMsgThreadNum(),
eventMeshGrpcConfiguration.getEventMeshServerPushMsgThreadNum(), pushMsgThreadPoolQueue,
- "eventMesh-grpc-pushMsg-%d", true);
+ "eventMesh-grpc-pushMsg", true);
replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshGrpcConfiguration.getEventMeshServerReplyMsgThreadNum(),
eventMeshGrpcConfiguration.getEventMeshServerReplyMsgThreadNum(), sendMsgThreadPoolQueue,
- "eventMesh-grpc-replyMsg-%d", true);
+ "eventMesh-grpc-replyMsg", true);
}
private void initHttpClientPool() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 77fd38771..5a08d6334 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -142,44 +142,44 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerBatchMsgThreadNum(),
- new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerBatchBlockQSize()),
- "eventMesh-batchMsg-", true);
+ new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerBatchBlockQSize()),
+ "eventMesh-batchMsg", true);
sendMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerSendMsgThreadNum(),
- new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerSendMsgBlockQSize()),
- "eventMesh-sendMsg-", true);
+ new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerSendMsgBlockQSize()),
+ "eventMesh-sendMsg", true);
remoteMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerRemoteMsgThreadNum(),
- new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerRemoteMsgBlockQSize()),
- "eventMesh-remoteMsg-", true);
+ new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerRemoteMsgBlockQSize()),
+ "eventMesh-remoteMsg", true);
pushMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerPushMsgThreadNum(),
- new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerPushMsgBlockQSize()),
- "eventMesh-pushMsg-", true);
+ new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerPushMsgBlockQSize()),
+ "eventMesh-pushMsg", true);
clientManageExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerClientManageThreadNum(),
- new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()),
- "eventMesh-clientManage-", true);
+ new LinkedBlockingQueue<>(eventMeshHttpConfiguration.getEventMeshServerClientManageBlockQSize()),
+ "eventMesh-clientManage", true);
adminExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerAdminThreadNum(),
- new LinkedBlockingQueue<Runnable>(50), "eventMesh-admin-",
+ new LinkedBlockingQueue<>(50), "eventMesh-admin",
true);
replyMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerReplyMsgThreadNum(),
- new LinkedBlockingQueue<Runnable>(100),
- "eventMesh-replyMsg-", true);
+ new LinkedBlockingQueue<>(100),
+ "eventMesh-replyMsg", true);
}
public ThreadPoolExecutor getBatchMsgExecutor() {
@@ -397,7 +397,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
webhookExecutor = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(),
eventMeshHttpConfiguration.getEventMeshServerWebhookThreadNum(),
- new LinkedBlockingQueue<Runnable>(100), "eventMesh-webhook-", true);
+ new LinkedBlockingQueue<>(100), "eventMesh-webhook", true);
final WebHookProcessor webHookProcessor = new WebHookProcessor();
final WebHookController webHookController = new WebHookController();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index 1bdce7b14..f4f89ada1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.runtime.boot;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
@@ -37,7 +38,6 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.Eventmesh
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.registry.Registry;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManager;
import java.util.List;
@@ -334,19 +334,19 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
super.init("eventMesh-tcp");
scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler,
- new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));
+ new EventMeshThreadFactory("eventMesh-tcp-scheduler", true));
taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
new LinkedBlockingQueue<>(10_000),
- new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
+ new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));
broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
new LinkedBlockingQueue<>(10_000),
- new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
+ new EventMeshThreadFactory("eventMesh-tcp-msg-downstream", true));
}
private void shutdownThreadPool() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
index 0022e24ff..2ad6cce02 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
@@ -17,15 +17,14 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.retry;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,20 +57,11 @@ public class GrpcRetryer {
public void init() {
pool = new ThreadPoolExecutor(
- grpcConfiguration.getEventMeshServerRetryThreadNum(),
- grpcConfiguration.getEventMeshServerRetryThreadNum(), 60000, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(grpcConfiguration.getEventMeshServerRetryBlockQueueSize()),
- new ThreadFactory() {
- private AtomicInteger count = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "grpc-retry-" + count.incrementAndGet());
- thread.setPriority(Thread.NORM_PRIORITY);
- thread.setDaemon(true);
- return thread;
- }
- }, new ThreadPoolExecutor.AbortPolicy());
+ grpcConfiguration.getEventMeshServerRetryThreadNum(),
+ grpcConfiguration.getEventMeshServerRetryThreadNum(), 60000, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(grpcConfiguration.getEventMeshServerRetryBlockQueueSize()),
+ new EventMeshThreadFactory("grpc-retry", true, Thread.NORM_PRIORITY),
+ new ThreadPoolExecutor.AbortPolicy());
dispatcher = new Thread(() -> {
try {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 4e060e3a4..ceafe19d2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -85,9 +85,9 @@ public class EventMeshConsumer {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.consumerGroupConf = consumerGroupConf;
this.persistentMqConsumer = new MQConsumerWrapper(
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
this.broadcastMqConsumer = new MQConsumerWrapper(
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshConnectorPluginType());
}
private MessageHandler httpMessageHandler;
@@ -99,26 +99,26 @@ public class EventMeshConsumer {
keyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
keyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
keyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
persistentMqConsumer.init(keyValue);
EventListener clusterEventListener = (event, context) -> {
String protocolVersion =
- Objects.requireNonNull(event.getSpecVersion()).toString();
+ Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
- EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
- EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+ EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
String topic = event.getSubject();
String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO)).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID)).toString();
event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
- .build();
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+ .build();
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
} else {
@@ -126,12 +126,12 @@ public class EventMeshConsumer {
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
- topic, null);
+ topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}",
- consumerGroupConf.getConsumerGroup(), topic);
+ consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -142,13 +142,13 @@ public class EventMeshConsumer {
}
SubscriptionItem subscriptionItem =
- consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+ consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
HandleMsgContext handleMsgContext = new HandleMsgContext(
- EventMeshUtil.buildPushMsgSeqNo(),
- consumerGroupConf.getConsumerGroup(),
- EventMeshConsumer.this,
- topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
- consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+ EventMeshUtil.buildPushMsgSeqNo(),
+ consumerGroupConf.getConsumerGroup(),
+ EventMeshConsumer.this,
+ topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+ consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -172,25 +172,25 @@ public class EventMeshConsumer {
broadcastKeyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
broadcastKeyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
broadcastKeyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
broadcastMqConsumer.init(broadcastKeyValue);
EventListener broadcastEventListener = (event, context) -> {
String protocolVersion =
- Objects.requireNonNull(event.getSpecVersion()).toString();
+ Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
- EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
- EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+ EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
- .build();
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+ .build();
String topic = event.getSubject();
String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO, "");
@@ -200,18 +200,18 @@ public class EventMeshConsumer {
messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
} else {
messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}",
- topic, bizSeqNo,
- uniqueId);
+ topic, bizSeqNo,
+ uniqueId);
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
- consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+ consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
- (EventMeshAsyncConsumeContext) context;
+ (EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}",
- consumerGroupConf.getConsumerGroup(), topic);
+ consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -222,15 +222,15 @@ public class EventMeshConsumer {
}
SubscriptionItem subscriptionItem =
- consumerGroupConf.getConsumerGroupTopicConf().get(topic)
- .getSubscriptionItem();
+ consumerGroupConf.getConsumerGroupTopicConf().get(topic)
+ .getSubscriptionItem();
HandleMsgContext handleMsgContext =
- new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
- consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, event, subscriptionItem,
- eventMeshAsyncConsumeContext.getAbstractContext(),
- consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
- currentTopicConfig);
+ new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+ consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+ topic, event, subscriptionItem,
+ eventMeshAsyncConsumeContext.getAbstractContext(),
+ consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
+ currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -312,16 +312,16 @@ public class EventMeshConsumer {
public void sendMessageBack(final CloudEvent event, final String uniqueId, String bizSeqNo) throws Exception {
EventMeshProducer sendMessageBack
- = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
+ = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
if (sendMessageBack == null) {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}",
- consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+ consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
return;
}
final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event, sendMessageBack,
- eventMeshHTTPServer);
+ eventMeshHTTPServer);
sendMessageBack.send(sendMessageBackContext, new SendCallback() {
@Override
@@ -331,7 +331,7 @@ public class EventMeshConsumer {
@Override
public void onException(OnExceptionContext context) {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
- consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+ consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
}
});
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
index b7faae46a..f87ab0501 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
@@ -17,14 +17,13 @@
package org.apache.eventmesh.runtime.core.protocol.http.retry;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,22 +56,12 @@ public class HttpRetryer {
public void init() {
pool = new ThreadPoolExecutor(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
- 60000,
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()),
- new ThreadFactory() {
- private AtomicInteger count = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "http-retry-" + count.incrementAndGet());
- thread.setPriority(Thread.NORM_PRIORITY);
- thread.setDaemon(true);
- return thread;
- }
- },
- new ThreadPoolExecutor.AbortPolicy());
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
+ 60000,
+ TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()),
+ new EventMeshThreadFactory("http-retry", true, Thread.NORM_PRIORITY),
+ new ThreadPoolExecutor.AbortPolicy());
dispatcher = new Thread(() -> {
try {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
index 560cf40e2..48406c38a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
@@ -17,9 +17,9 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import java.util.concurrent.ScheduledExecutorService;
@@ -47,8 +47,8 @@ public class EventMeshRebalanceService {
}
public void init() {
- this.serviceRebalanceScheduler = ThreadPoolFactory.createScheduledExecutor(5, new EventMeshThreadFactoryImpl("proxy-rebalance-sch", true));
- logger.info("rebalance service inited......");
+ this.serviceRebalanceScheduler = ThreadPoolFactory.createScheduledExecutor(5, new EventMeshThreadFactory("proxy-rebalance-sch", true));
+ logger.info("rebalance service inited ......");
}
public void start() throws Exception {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
index 6108d8961..8f6d2e8ad 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
@@ -17,10 +17,10 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import java.util.concurrent.ArrayBlockingQueue;
@@ -43,7 +43,7 @@ public class EventMeshTcpRetryer {
3,
60000,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
- new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true),
+ new EventMeshThreadFactory("eventMesh-tcp-retry", true),
new ThreadPoolExecutor.AbortPolicy());
private Thread dispatcher;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
index 5d06abf3e..fe9fca4bc 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
@@ -17,11 +17,12 @@
package org.apache.eventmesh.runtime.metrics.grpc;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.GrpcSummaryMetrics;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -49,7 +50,7 @@ public class EventMeshGrpcMonitor {
this.metricsRegistries = Preconditions.checkNotNull(metricsRegistries);
this.grpcSummaryMetrics = new GrpcSummaryMetrics();
this.scheduler = ThreadPoolFactory.createScheduledExecutor(SCHEDULE_THREAD_SIZE,
- new EventMeshThreadFactoryImpl(THREAD_NAME_PREFIX, true));
+ new EventMeshThreadFactory(THREAD_NAME_PREFIX, true));
}
public void init() throws Exception {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index 7ceecc08e..2b3707ead 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.metrics.http;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
@@ -25,9 +26,8 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,17 +43,17 @@ public class HTTPMetricsServer {
private final transient HttpSummaryMetrics summaryMetrics;
public HTTPMetricsServer(final EventMeshHTTPServer eventMeshHTTPServer,
- final List<MetricsRegistry> metricsRegistries) {
+ final List<MetricsRegistry> metricsRegistries) {
Objects.requireNonNull(eventMeshHTTPServer, "EventMeshHTTPServer can not be null");
Objects.requireNonNull(metricsRegistries, "List<MetricsRegistry> can not be null");
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.metricsRegistries = metricsRegistries;
this.summaryMetrics = new HttpSummaryMetrics(
- eventMeshHTTPServer.batchMsgExecutor,
- eventMeshHTTPServer.sendMsgExecutor,
- eventMeshHTTPServer.pushMsgExecutor,
- eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
+ eventMeshHTTPServer.batchMsgExecutor,
+ eventMeshHTTPServer.sendMsgExecutor,
+ eventMeshHTTPServer.pushMsgExecutor,
+ eventMeshHTTPServer.getHttpRetryer().getFailedQueue());
init();
}
@@ -105,96 +105,82 @@ public class HTTPMetricsServer {
}
}
- private static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2, new ThreadFactory() {
- private final transient AtomicInteger seq = new AtomicInteger(0);
-
- @Override
- public Thread newThread(final Runnable r) {
- seq.incrementAndGet();
- final Thread t = new Thread(r, "eventMesh-metrics-" + seq.get());
- t.setDaemon(true);
- return t;
- }
- });
+ private static ScheduledExecutorService metricsSchedule = Executors.newScheduledThreadPool(2,
+ new EventMeshThreadFactory("eventMesh-metrics", true));
// todo: move this into standalone metrics plugin
private void logPrintServerMetrics(final HttpSummaryMetrics summaryMetrics,
- final EventMeshHTTPServer eventMeshHTTPServer) {
+ final EventMeshHTTPServer eventMeshHTTPServer) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("===========================================SERVER METRICS==================================================");
LOGGER.info("maxHTTPTPS: {}, avgHTTPTPS: {}, maxHTTPCOST: {}, avgHTTPCOST: {}, avgHTTPBodyDecodeCost: {}, httpDiscard: {}",
- summaryMetrics.maxHTTPTPS(),
- summaryMetrics.avgHTTPTPS(),
- summaryMetrics.maxHTTPCost(),
- summaryMetrics.avgHTTPCost(),
- summaryMetrics.avgHTTPBodyDecodeCost(),
- summaryMetrics.getHttpDiscard());
+ summaryMetrics.maxHTTPTPS(),
+ summaryMetrics.avgHTTPTPS(),
+ summaryMetrics.maxHTTPCost(),
+ summaryMetrics.avgHTTPCost(),
+ summaryMetrics.avgHTTPBodyDecodeCost(),
+ summaryMetrics.getHttpDiscard());
}
summaryMetrics.httpStatInfoClear();
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info("maxBatchSendMsgTPS: {}, avgBatchSendMsgTPS: {}, sum: {}. sumFail: {}, sumFailRate: {}, discard : {}",
- summaryMetrics.maxSendBatchMsgTPS(),
- summaryMetrics.avgSendBatchMsgTPS(),
- summaryMetrics.getSendBatchMsgNumSum(),
- summaryMetrics.getSendBatchMsgFailNumSum(),
- summaryMetrics.getSendBatchMsgFailRate(),
- summaryMetrics.getSendBatchMsgDiscardNumSum()
+ summaryMetrics.maxSendBatchMsgTPS(),
+ summaryMetrics.avgSendBatchMsgTPS(),
+ summaryMetrics.getSendBatchMsgNumSum(),
+ summaryMetrics.getSendBatchMsgFailNumSum(),
+ summaryMetrics.getSendBatchMsgFailRate(),
+ summaryMetrics.getSendBatchMsgDiscardNumSum()
);
}
summaryMetrics.cleanSendBatchStat();
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info("maxSendMsgTPS: {}, avgSendMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, replyMsg: {}, replyFail: {}",
- summaryMetrics.maxSendMsgTPS(),
- summaryMetrics.avgSendMsgTPS(),
- summaryMetrics.getSendMsgNumSum(),
- summaryMetrics.getSendMsgFailNumSum(),
- summaryMetrics.getSendMsgFailRate(),
- summaryMetrics.getReplyMsgNumSum(),
- summaryMetrics.getReplyMsgFailNumSum()
+ summaryMetrics.maxSendMsgTPS(),
+ summaryMetrics.avgSendMsgTPS(),
+ summaryMetrics.getSendMsgNumSum(),
+ summaryMetrics.getSendMsgFailNumSum(),
+ summaryMetrics.getSendMsgFailRate(),
+ summaryMetrics.getReplyMsgNumSum(),
+ summaryMetrics.getReplyMsgFailNumSum()
);
}
summaryMetrics.cleanSendMsgStat();
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
- "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
- summaryMetrics.maxPushMsgTPS(),
- summaryMetrics.avgPushMsgTPS(),
- summaryMetrics.getHttpPushMsgNumSum(),
- summaryMetrics.getHttpPushFailNumSum(),
- summaryMetrics.getHttpPushMsgFailRate(),
- summaryMetrics.maxHTTPPushLatency(),
- summaryMetrics.avgHTTPPushLatency()
+ "maxPushMsgTPS: {}, avgPushMsgTPS: {}, sum: {}, sumFail: {}, sumFailRate: {}, maxClientLatency: {}, avgClientLatency: {}",
+ summaryMetrics.maxPushMsgTPS(),
+ summaryMetrics.avgPushMsgTPS(),
+ summaryMetrics.getHttpPushMsgNumSum(),
+ summaryMetrics.getHttpPushFailNumSum(),
+ summaryMetrics.getHttpPushMsgFailRate(),
+ summaryMetrics.maxHTTPPushLatency(),
+ summaryMetrics.avgHTTPPushLatency()
);
}
summaryMetrics.cleanHttpPushMsgStat();
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info("batchMsgQ: {}, sendMsgQ: {}, pushMsgQ: {}, httpRetryQ: {}",
- eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getHttpRetryer().size());
+ eventMeshHTTPServer.getBatchMsgExecutor().getQueue().size(),
+ eventMeshHTTPServer.getSendMsgExecutor().getQueue().size(),
+ eventMeshHTTPServer.getPushMsgExecutor().getQueue().size(),
+ eventMeshHTTPServer.getHttpRetryer().size());
}
-
if (LOGGER.isInfoEnabled()) {
LOGGER.info("batchAvgSend2MQCost: {}, avgSend2MQCost: {}, avgReply2MQCost: {}",
- summaryMetrics.avgBatchSendMsgCost(),
- summaryMetrics.avgSendMsgCost(),
- summaryMetrics.avgReplyMsgCost());
+ summaryMetrics.avgBatchSendMsgCost(),
+ summaryMetrics.avgSendMsgCost(),
+ summaryMetrics.avgReplyMsgCost());
}
summaryMetrics.send2MQStatInfoClear();
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImpl.java
deleted file mode 100644
index cf8390411..000000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshThreadFactoryImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class EventMeshThreadFactoryImpl implements ThreadFactory {
- private final AtomicLong threadIndex = new AtomicLong(0);
- private final String threadNamePrefix;
- private Boolean isDaemonSpecified = null;
-
- public EventMeshThreadFactoryImpl(final String threadNamePrefix) {
- this.threadNamePrefix = threadNamePrefix;
- }
-
- public EventMeshThreadFactoryImpl(final String threadNamePrefix, final boolean isDaemonSpecified) {
- this.threadNamePrefix = threadNamePrefix;
- this.isDaemonSpecified = isDaemonSpecified;
- }
-
- public String getThreadNamePrefix() {
- return threadNamePrefix;
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadNamePrefix + '-' + this.threadIndex.incrementAndGet());
- if (isDaemonSpecified != null) {
- t.setDaemon(isDaemonSpecified);
- }
- return t;
- }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 1303529aa..badb953b8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.util;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.RandomStringUtils;
@@ -292,7 +293,7 @@ public class EventMeshUtil {
public static void printState(final ThreadPoolExecutor scheduledExecutorService) {
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactoryImpl) scheduledExecutorService.getThreadFactory())
+ LOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactory) scheduledExecutorService.getThreadFactory())
.getThreadNamePrefix(), scheduledExecutorService.getQueue().size(), scheduledExecutorService
.getPoolSize(), scheduledExecutorService.getActiveCount(), scheduledExecutorService
.getCompletedTaskCount());
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java
index 0a14fee35..b660f6640 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/TCPClient.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.client.common;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.protocol.tcp.Package;
import java.io.Closeable;
@@ -25,10 +26,8 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-
/**
* one Client connects one ACCESS
* Provides the most basic connection, send capability, and cannot provide disconnected reconnection capability,
@@ -70,27 +68,9 @@ public abstract class TCPClient implements Closeable {
private Bootstrap bootstrap = new Bootstrap();
protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4,
- new ThreadFactory() {
- AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "TCPClientScheduler-" + count.incrementAndGet());
- t.setDaemon(true);
- return t;
- }
- }
- );
+ new EventMeshThreadFactory("TCPClientScheduler", true));
- private NioEventLoopGroup workers = new NioEventLoopGroup(8, new ThreadFactory() {
- AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "TCPClientWorker-" + count.incrementAndGet());
- return t;
- }
- });
+ private NioEventLoopGroup workers = new NioEventLoopGroup(8, new EventMeshThreadFactory("TCPClientWorker"));
public Channel channel;
@@ -141,15 +121,15 @@ public abstract class TCPClient implements Closeable {
bootstrap.group(workers);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000)
- .option(ChannelOption.SO_KEEPALIVE, false)
- .option(ChannelOption.SO_SNDBUF, 64 * 1024)
- .option(ChannelOption.SO_RCVBUF, 64 * 1024)
- .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .option(ChannelOption.SO_SNDBUF, 64 * 1024)
+ .option(ChannelOption.SO_RCVBUF, 64 * 1024)
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Codec.Encoder(), new Codec.Decoder())
- .addLast(handler, newExceptionHandler());
+ .addLast(handler, newExceptionHandler());
}
});
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java
index 338b2708b..2c715b373 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/EventMeshUtilTest.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.util;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
@@ -167,7 +168,7 @@ public class EventMeshUtilTest {
public void testPrintState() {
try {
ScheduledExecutorService serviceRebalanceScheduler = ThreadPoolFactory
- .createScheduledExecutor(5, new EventMeshThreadFactoryImpl("proxy-rebalance-sch", true));
+ .createScheduledExecutor(5, new EventMeshThreadFactory("proxy-rebalance-sch", true));
EventMeshUtil.printState((ThreadPoolExecutor) serviceRebalanceScheduler);
} catch (Exception e) {
Assert.fail(e.getMessage());
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
index ce3d186aa..deba03614 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java
@@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toList;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
@@ -53,8 +54,6 @@ import java.util.stream.Collectors;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -68,8 +67,8 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
private final transient Map<String, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap<>();
private final transient ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setNameFormat("GRPCClientScheduler").setDaemon(true).build());
+ Runtime.getRuntime().availableProcessors(),
+ new EventMeshThreadFactory("GRPCClientScheduler", true));
private transient ConsumerServiceBlockingStub consumerClient;
private transient ConsumerServiceStub consumerAsyncClient;
@@ -84,7 +83,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
public void init() {
channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort())
- .usePlaintext().build();
+ .usePlaintext().build();
consumerClient = ConsumerServiceGrpc.newBlockingStub(channel);
consumerAsyncClient = ConsumerServiceGrpc.newStub(channel);
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
index 2e8440237..bb8ec1e77 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/EventMeshHttpConsumer.java
@@ -24,6 +24,7 @@ import org.apache.eventmesh.client.http.model.RequestParam;
import org.apache.eventmesh.client.http.util.HttpUtils;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
@@ -49,8 +50,6 @@ import java.util.stream.Collectors;
import io.netty.handler.codec.http.HttpMethod;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -71,14 +70,11 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo
throws EventMeshException {
super(eventMeshHttpClientConfig);
this.consumeExecutor = Optional.ofNullable(customExecutor).orElseGet(
- () -> ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(),
- eventMeshHttpClientConfig.getConsumeThreadMax(), "EventMesh-client-consume-")
- );
-
- this.scheduler = new ScheduledThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setNameFormat("HTTPClientScheduler").setDaemon(true).build()
+ () -> ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpClientConfig.getConsumeThreadCore(),
+ eventMeshHttpClientConfig.getConsumeThreadMax(), "EventMesh-client-consume")
);
+ this.scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+ new EventMeshThreadFactory("HTTPClientScheduler", true));
}
/**
@@ -190,17 +186,17 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo
private RequestParam buildCommonRequestParam() {
return new RequestParam(HttpMethod.POST)
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
- // add protocol version?
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
- .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshHttpClientConfig.getConsumerGroup());
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
+ // add protocol version?
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+ .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshHttpClientConfig.getConsumerGroup());
}
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
index fda7fbe5b..74b32c515 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
@@ -18,17 +18,20 @@
package org.apache.eventmesh.client.tcp.common;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
+
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -49,7 +52,6 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -72,9 +74,8 @@ public abstract class TcpClient implements Closeable {
private transient ScheduledFuture<?> heartTask;
- protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build());
+ protected static final ScheduledExecutorService scheduler = ThreadPoolFactory.createScheduledExecutor(Runtime.getRuntime().availableProcessors(),
+ new EventMeshThreadFactory("TCPClientScheduler", true));
public TcpClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) {
Preconditions.checkNotNull(eventMeshTcpClientConfig, "EventMeshTcpClientConfig cannot be null");
@@ -89,16 +90,16 @@ public abstract class TcpClient implements Closeable {
bootstrap.group(workers);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1_000)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.SO_SNDBUF, 64 * 1024)
- .option(ChannelOption.SO_RCVBUF, 64 * 1024)
- .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.SO_SNDBUF, 64 * 1024)
+ .option(ChannelOption.SO_RCVBUF, 64 * 1024)
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 8192, 65536))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new Codec.Encoder(), new Codec.Decoder())
- .addLast(handler, newExceptionHandler());
+ .addLast(handler, newExceptionHandler());
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org