You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/12 04:36:12 UTC

[incubator-eventmesh] branch master updated: [ISSUE #3120]Change the EventMeshTCPConfiguration property from public to private (#3121)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new af7b6fbbc [ISSUE #3120]Change the EventMeshTCPConfiguration property from public to private (#3121)
af7b6fbbc is described below

commit af7b6fbbcc0296bd3a1554184e1e0cc2e9429249
Author: mxsm <lj...@gmail.com>
AuthorDate: Sun Feb 12 12:36:06 2023 +0800

    [ISSUE #3120]Change the EventMeshTCPConfiguration property from public to private (#3121)
---
 .../admin/controller/ClientManageController.java   |  2 +-
 .../admin/handler/ConfigurationHandler.java        |  2 +-
 .../eventmesh/runtime/boot/EventMeshTCPServer.java | 24 ++++-----
 .../configuration/EventMeshTCPConfiguration.java   | 62 +++++++++++-----------
 .../tcp/client/EventMeshTcpConnectionHandler.java  |  2 +-
 .../tcp/client/group/ClientGroupWrapper.java       | 10 ++--
 .../client/group/ClientSessionGroupMapping.java    |  7 ++-
 .../rebalance/EventMeshRebalanceService.java       |  2 +-
 .../client/rebalance/EventmeshRebalanceImpl.java   |  2 +-
 .../tcp/client/session/push/SessionPusher.java     |  6 +--
 .../client/session/retry/EventMeshTcpRetryer.java  | 10 ++--
 .../tcp/client/session/send/SessionSender.java     |  2 +-
 .../tcp/client/task/MessageTransferTask.java       |  6 +--
 .../runtime/boot/EventMeshServerTest.java          | 50 ++++++++---------
 .../EventMeshTCPConfigurationTest.java             | 50 ++++++++---------
 15 files changed, 116 insertions(+), 121 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
index ca4e8484d..4b04c9200 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
@@ -88,7 +88,7 @@ public class ClientManageController {
 
 
     public void start() throws IOException {
-        int port = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerAdminPort;
+        int port = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshServerAdminPort();
         HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
 
         HttpHandlerManager httpHandlerManager = new HttpHandlerManager();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java
index 3324f0f0c..afd702a62 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java
@@ -93,7 +93,7 @@ public class ConfigurationHandler extends AbstractHttpHandler {
                     eventMeshTCPConfiguration.isEventMeshServerSecurityEnable(),
                     eventMeshTCPConfiguration.isEventMeshServerRegistryEnable(),
                     // TCP Configuration
-                    eventMeshTCPConfiguration.eventMeshTcpServerPort,
+                    eventMeshTCPConfiguration.getEventMeshTcpServerPort(),
                     // HTTP Configuration
                     eventMeshHTTPConfiguration.getHttpServerPort(),
                     eventMeshHTTPConfiguration.isEventMeshServerUseTls(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index f4f89ada1..034662636 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -150,9 +150,9 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
                             .addLast(
                                     getWorkerGroup(),
                                     new IdleStateHandler(
-                                            eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
-                                            eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
-                                            eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
+                                            eventMeshTCPConfiguration.getEventMeshTcpIdleReadSeconds(),
+                                            eventMeshTCPConfiguration.getEventMeshTcpIdleWriteSeconds(),
+                                            eventMeshTCPConfiguration.getEventMeshTcpIdleAllSeconds()),
                                     new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
                                     new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
                             );
@@ -176,7 +176,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
                     .childHandler(channelInitializer);
 
             try {
-                int port = eventMeshTCPConfiguration.eventMeshTcpServerPort;
+                int port = eventMeshTCPConfiguration.getEventMeshTcpServerPort();
                 ChannelFuture f = bootstrap.bind(port).sync();
                 LOGGER.info("EventMeshTCPServer[port={}] started.....", port);
                 f.channel().closeFuture().sync();
@@ -200,7 +200,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         }
         initThreadPool();
 
-        rateLimiter = RateLimiter.create(eventMeshTCPConfiguration.eventMeshTcpMsgReqnumPerSecond);
+        rateLimiter = RateLimiter.create(eventMeshTCPConfiguration.getEventMeshTcpMsgReqnumPerSecond());
 
         globalTrafficShapingHandler = newGTSHandler(scheduler, eventMeshTCPConfiguration.getGtc().getReadLimit());
 
@@ -300,7 +300,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         boolean registerResult = false;
         try {
             String endPoints = IPUtils.getLocalAddress()
-                    + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+                    + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.getEventMeshTcpServerPort();
             EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
             eventMeshRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
             eventMeshRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName() + "-"
@@ -318,7 +318,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
 
     private void unRegister() throws Exception {
         String endPoints = IPUtils.getLocalAddress()
-                + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+                + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.getEventMeshTcpServerPort();
         EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
         eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
         eventMeshUnRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName());
@@ -333,18 +333,18 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
     private void initThreadPool() throws Exception {
         super.init("eventMesh-tcp");
 
-        scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler,
+        scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.getEventMeshTcpGlobalScheduler(),
                 new EventMeshThreadFactory("eventMesh-tcp-scheduler", true));
 
         taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
-                eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
-                eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
+                eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
+                eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
                 new LinkedBlockingQueue<>(10_000),
                 new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));
 
         broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
-                eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
-                eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
+                eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
+                eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
                 new LinkedBlockingQueue<>(10_000),
                 new EventMeshThreadFactory("eventMesh-tcp-msg-downstream", true));
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
index d89d043e1..e210bb276 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
@@ -31,95 +31,95 @@ import lombok.NoArgsConstructor;
 public class EventMeshTCPConfiguration extends CommonConfiguration {
 
     @ConfigFiled(field = "tcp.port")
-    public int eventMeshTcpServerPort = 10000;
+    private int eventMeshTcpServerPort = 10000;
 
     @ConfigFiled(field = "tcp.allIdleSeconds")
-    public int eventMeshTcpIdleAllSeconds = 60;
+    private int eventMeshTcpIdleAllSeconds = 60;
 
     @ConfigFiled(field = "tcp.writerIdleSeconds")
-    public int eventMeshTcpIdleWriteSeconds = 60;
+    private int eventMeshTcpIdleWriteSeconds = 60;
 
     @ConfigFiled(field = "tcp.readerIdleSeconds")
-    public int eventMeshTcpIdleReadSeconds = 60;
+    private int eventMeshTcpIdleReadSeconds = 60;
 
     @ConfigFiled(field = "tcp.msgReqnumPerSecond")
-    public Integer eventMeshTcpMsgReqnumPerSecond = 15000;
+    private Integer eventMeshTcpMsgReqnumPerSecond = 15000;
 
     /**
      * TCP Server allows max client num
      */
     @ConfigFiled(field = "tcp.clientMaxNum")
-    public int eventMeshTcpClientMaxNum = 10000;
+    private int eventMeshTcpClientMaxNum = 10000;
 
     //======================================= New add config =================================
 
     @ConfigFiled(field = "global.scheduler")
-    public int eventMeshTcpGlobalScheduler = 5;
+    private int eventMeshTcpGlobalScheduler = 5;
 
     @ConfigFiled(field = "tcp.taskHandleExecutorPoolSize")
-    public int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
+    private int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
 
     @ConfigFiled(field = "tcp.msgDownStreamExecutorPoolSize")
-    public int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);
+    private int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);
 
     @ConfigFiled(field = "session.expiredInMills")
-    public int eventMeshTcpSessionExpiredInMills = 60000;
+    private int eventMeshTcpSessionExpiredInMills = 60000;
 
     @ConfigFiled(field = "session.upstreamBufferSize")
-    public int eventMeshTcpSessionUpstreamBufferSize = 100;
+    private int eventMeshTcpSessionUpstreamBufferSize = 100;
 
     @ConfigFiled(field = "retry.async.pushRetryTimes")
-    public int eventMeshTcpMsgAsyncRetryTimes = 3;
+    private int eventMeshTcpMsgAsyncRetryTimes = 3;
 
     @ConfigFiled(field = "retry.sync.pushRetryTimes")
-    public int eventMeshTcpMsgSyncRetryTimes = 1;
+    private int eventMeshTcpMsgSyncRetryTimes = 1;
 
     @ConfigFiled(field = "retry.sync.pushRetryDelayInMills")
-    public int eventMeshTcpMsgRetrySyncDelayInMills = 500;
+    private int eventMeshTcpMsgRetrySyncDelayInMills = 500;
 
     @ConfigFiled(field = "retry.async.pushRetryDelayInMills")
-    public int eventMeshTcpMsgRetryAsyncDelayInMills = 500;
+    private int eventMeshTcpMsgRetryAsyncDelayInMills = 500;
 
     @ConfigFiled(field = "retry.pushRetryQueueSize")
-    public int eventMeshTcpMsgRetryQueueSize = 10000;
+    private int eventMeshTcpMsgRetryQueueSize = 10000;
 
     @ConfigFiled(field = "tcp.RebalanceIntervalInMills")
-    public Integer eventMeshTcpRebalanceIntervalInMills = 30 * 1000;
+    private Integer eventMeshTcpRebalanceIntervalInMills = 30 * 1000;
 
     @ConfigFiled(field = "admin.http.port")
-    public int eventMeshServerAdminPort = 10106;
+    private int eventMeshServerAdminPort = 10106;
 
     @ConfigFiled(field = "tcp.sendBack.enabled")
-    public boolean eventMeshTcpSendBackEnabled = Boolean.TRUE;
+    private boolean eventMeshTcpSendBackEnabled = Boolean.TRUE;
 
-    @ConfigFiled(field = "")
-    public int eventMeshTcpSendBackMaxTimes = 3;
+    @ConfigFiled(field = "tcp.SendBackMaxTimes")
+    private int eventMeshTcpSendBackMaxTimes = 3;
 
     @ConfigFiled(field = "tcp.pushFailIsolateTimeInMills")
-    public int eventMeshTcpPushFailIsolateTimeInMills = 30 * 1000;
+    private int eventMeshTcpPushFailIsolateTimeInMills = 30 * 1000;
 
     @ConfigFiled(field = "gracefulShutdown.sleepIntervalInMills")
-    public int gracefulShutdownSleepIntervalInMills = 1000;
+    private int gracefulShutdownSleepIntervalInMills = 1000;
 
     @ConfigFiled(field = "rebalanceRedirect.sleepIntervalInM")
-    public int sleepIntervalInRebalanceRedirectMills = 200;
+    private int sleepIntervalInRebalanceRedirectMills = 200;
 
     @ConfigFiled(field = "maxEventSize")
-    public int eventMeshEventSize = 1000;
+    private int eventMeshEventSize = 1000;
 
     @ConfigFiled(field = "maxEventBatchSize")
-    public int eventMeshEventBatchSize = 10;
+    private int eventMeshEventBatchSize = 10;
 
-    private final TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000);
+    private final TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2_000);
     private final TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000);
 
     @Data
     @NoArgsConstructor
     @AllArgsConstructor
     public static class TrafficShapingConfig {
-        long writeLimit = 0;
-        long readLimit = 1000;
-        long checkInterval = 1000;
-        long maxTime = 5000;
+        private long writeLimit = 0;
+        private long readLimit = 1000;
+        private long checkInterval = 1000;
+        private long maxTime = 5000;
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
index 4b3a10052..fe2f45f88 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
@@ -60,7 +60,7 @@ public class EventMeshTcpConnectionHandler extends ChannelDuplexHandler {
         logger.info("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "");
 
         int c = connections.incrementAndGet();
-        if (c > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpClientMaxNum) {
+        if (c > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpClientMaxNum()) {
             logger.warn("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client connect "
                     +
                     "this eventMesh server");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 245cf311e..c4f8de7ca 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -486,13 +486,9 @@ public class ClientGroupWrapper {
                                 group, topic, bizSeqNo, sendBackTimes,
                                 sendBackFromEventMeshIp);
 
-                        if (Objects.requireNonNull(sendBackTimes) >= eventMeshTCPServer
-                                .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
-                            log.error(
-                                    "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
-                                            + "bizSeqNo:{}", eventMeshTCPServer
-                                            .getEventMeshTCPConfiguration()
-                                            .eventMeshTcpSendBackMaxTimes,
+                        int eventMeshTcpSendBackMaxTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSendBackMaxTimes();
+                        if (Objects.requireNonNull(sendBackTimes) >= eventMeshTcpSendBackMaxTimes) {
+                            log.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, " + "bizSeqNo:{}", eventMeshTcpSendBackMaxTimes,
                                     group, topic, bizSeqNo);
                         } else {
                             sendBackTimes++;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 2ebc2aa64..b9c6547f8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -361,7 +361,7 @@ public class ClientSessionGroupMapping {
             () -> {
                 for (Session tmp : sessionTable.values()) {
                     if (System.currentTimeMillis() - tmp.getLastHeartbeatTime()
-                        > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
+                        > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSessionExpiredInMills()) {
                         try {
                             if (log.isWarnEnabled()) {
                                 log.warn("clean expired session,client:{}", tmp.getClient());
@@ -372,8 +372,7 @@ public class ClientSessionGroupMapping {
                         }
                     }
                 }
-            }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills,
-                TimeUnit.MILLISECONDS);
+            }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSessionExpiredInMills(), TimeUnit.MILLISECONDS);
     }
 
     private void initDownStreamMsgContextCleaner() {
@@ -427,7 +426,7 @@ public class ClientSessionGroupMapping {
                 }
             }
             try {
-                Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().gracefulShutdownSleepIntervalInMills);
+                Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().getGracefulShutdownSleepIntervalInMills());
             } catch (InterruptedException e) {
                 log.warn("Thread.sleep occur InterruptedException", e);
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
index 48406c38a..20f087a07 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
@@ -43,7 +43,7 @@ public class EventMeshRebalanceService {
     public EventMeshRebalanceService(EventMeshTCPServer eventMeshTCPServer, EventMeshRebalanceStrategy rebalanceStrategy) {
         this.eventMeshTCPServer = eventMeshTCPServer;
         this.rebalanceStrategy = rebalanceStrategy;
-        this.rebalanceIntervalMills = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpRebalanceIntervalInMills;
+        this.rebalanceIntervalMills = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpRebalanceIntervalInMills();
     }
 
     public void init() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
index 26405f8ec..ad51bcb32 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
@@ -171,7 +171,7 @@ public class EventmeshRebalanceImpl implements EventMeshRebalanceStrategy {
                     Integer.parseInt(newProxyPort), sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
             logger.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
             try {
-                Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
+                Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().getSleepIntervalInRebalanceRedirectMills());
             } catch (InterruptedException e) {
                 logger.warn("Thread.sleep occur InterruptedException", e);
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index ba30abaec..34942b983 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -129,14 +129,14 @@ public class SessionPusher {
 
                             //how long to isolate client when push fail
                             long isolateTime = System.currentTimeMillis()
-                                + session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills;
+                                + session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
                             session.setIsolateTime(isolateTime);
                             logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);
 
                             //retry
                             long delayTime = SubscriptionType.SYNC == downStreamMsgContext.subscriptionItem.getType()
-                                ? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills
-                                : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills;
+                                ? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
+                                : session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
                             downStreamMsgContext.delay(delayTime);
                             Objects.requireNonNull(session.getClientGroupWrapper().get()).getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
                         } else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
index 8f6d2e8ad..8efa9d71d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
@@ -61,19 +61,19 @@ public class EventMeshTcpRetryer {
     }
 
     public void pushRetry(RetryContext retryContext) {
-        if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
+        if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize()) {
             logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
-                eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, retryContext.retryTimes,
+                eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize(), retryContext.retryTimes,
                 retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.event));
             return;
         }
 
-        int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+        int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
         if (retryContext instanceof DownStreamMsgContext) {
             DownStreamMsgContext downStreamMsgContext = (DownStreamMsgContext) retryContext;
             maxRetryTimes = SubscriptionType.SYNC == downStreamMsgContext.subscriptionItem.getType()
-                ? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes :
-                eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+                ? eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgSyncRetryTimes() :
+                eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
         }
 
         if (retryContext.retryTimes >= maxRetryTimes) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
index 898f08cca..b5b7500f7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
@@ -83,7 +83,7 @@ public class SessionSender {
 
     public SessionSender(Session session) {
         this.session = session;
-        this.upstreamBuff = new Semaphore(session.getEventMeshTCPConfiguration().eventMeshTcpSessionUpstreamBufferSize);
+        this.upstreamBuff = new Semaphore(session.getEventMeshTCPConfiguration().getEventMeshTcpSessionUpstreamBufferSize());
     }
 
     public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback sendCallback, long startTime,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index ab35a727d..0c13db839 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -114,9 +114,9 @@ public class MessageTransferTask extends AbstractTask {
             }
 
             String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
-            if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) {
-                throw new Exception("event size exceeds the limit: "
-                        + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize);
+            int eventMeshEventSize = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshEventSize();
+            if (content.length() > eventMeshEventSize) {
+                throw new Exception("event size exceeds the limit: " + eventMeshEventSize);
             }
 
             //do acl check in sending msg
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java
index 84aedc1fd..f038810d5 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java
@@ -60,31 +60,31 @@ public class EventMeshServerTest {
     }
 
     private void assertTCPConfig(EventMeshTCPConfiguration config) {
-        Assert.assertEquals(config.eventMeshTcpServerPort, 816);
-        Assert.assertEquals(config.eventMeshTcpIdleAllSeconds, 1816);
-        Assert.assertEquals(config.eventMeshTcpIdleWriteSeconds, 2816);
-        Assert.assertEquals(config.eventMeshTcpIdleReadSeconds, 3816);
-        Assert.assertEquals(config.eventMeshTcpMsgReqnumPerSecond, Integer.valueOf(4816));
-        Assert.assertEquals(config.eventMeshTcpClientMaxNum, 5816);
-        Assert.assertEquals(config.eventMeshTcpGlobalScheduler, 6816);
-        Assert.assertEquals(config.eventMeshTcpTaskHandleExecutorPoolSize, 7816);
-        Assert.assertEquals(config.eventMeshTcpMsgDownStreamExecutorPoolSize, 8816);
-        Assert.assertEquals(config.eventMeshTcpSessionExpiredInMills, 1816);
-        Assert.assertEquals(config.eventMeshTcpSessionUpstreamBufferSize, 11816);
-        Assert.assertEquals(config.eventMeshTcpMsgAsyncRetryTimes, 12816);
-        Assert.assertEquals(config.eventMeshTcpMsgSyncRetryTimes, 13816);
-        Assert.assertEquals(config.eventMeshTcpMsgRetrySyncDelayInMills, 14816);
-        Assert.assertEquals(config.eventMeshTcpMsgRetryAsyncDelayInMills, 15816);
-        Assert.assertEquals(config.eventMeshTcpMsgRetryQueueSize, 16816);
-        Assert.assertEquals(config.eventMeshTcpRebalanceIntervalInMills, Integer.valueOf(17816));
-        Assert.assertEquals(config.eventMeshServerAdminPort, 18816);
-        Assert.assertEquals(config.eventMeshTcpSendBackEnabled, Boolean.TRUE);
-        Assert.assertEquals(config.eventMeshTcpSendBackMaxTimes, 3);
-        Assert.assertEquals(config.eventMeshTcpPushFailIsolateTimeInMills, 21816);
-        Assert.assertEquals(config.gracefulShutdownSleepIntervalInMills, 22816);
-        Assert.assertEquals(config.sleepIntervalInRebalanceRedirectMills, 23816);
-        Assert.assertEquals(config.eventMeshEventSize, 22816);
-        Assert.assertEquals(config.eventMeshEventBatchSize, 23816);
+        Assert.assertEquals(config.getEventMeshTcpServerPort(), 816);
+        Assert.assertEquals(config.getEventMeshTcpIdleAllSeconds(), 1816);
+        Assert.assertEquals(config.getEventMeshTcpIdleWriteSeconds(), 2816);
+        Assert.assertEquals(config.getEventMeshTcpIdleReadSeconds(), 3816);
+        Assert.assertEquals(config.getEventMeshTcpMsgReqnumPerSecond(), Integer.valueOf(4816));
+        Assert.assertEquals(config.getEventMeshTcpClientMaxNum(), 5816);
+        Assert.assertEquals(config.getEventMeshTcpGlobalScheduler(), 6816);
+        Assert.assertEquals(config.getEventMeshTcpTaskHandleExecutorPoolSize(), 7816);
+        Assert.assertEquals(config.getEventMeshTcpMsgDownStreamExecutorPoolSize(), 8816);
+        Assert.assertEquals(config.getEventMeshTcpSessionExpiredInMills(), 1816);
+        Assert.assertEquals(config.getEventMeshTcpSessionUpstreamBufferSize(), 11816);
+        Assert.assertEquals(config.getEventMeshTcpMsgAsyncRetryTimes(), 12816);
+        Assert.assertEquals(config.getEventMeshTcpMsgSyncRetryTimes(), 13816);
+        Assert.assertEquals(config.getEventMeshTcpMsgRetrySyncDelayInMills(), 14816);
+        Assert.assertEquals(config.getEventMeshTcpMsgRetryAsyncDelayInMills(), 15816);
+        Assert.assertEquals(config.getEventMeshTcpMsgRetryQueueSize(), 16816);
+        Assert.assertEquals(config.getEventMeshTcpRebalanceIntervalInMills(), Integer.valueOf(17816));
+        Assert.assertEquals(config.getEventMeshServerAdminPort(), 18816);
+        Assert.assertEquals(config.isEventMeshTcpSendBackEnabled(), Boolean.TRUE);
+        Assert.assertEquals(config.getEventMeshTcpSendBackMaxTimes(), 3);
+        Assert.assertEquals(config.getEventMeshTcpPushFailIsolateTimeInMills(), 21816);
+        Assert.assertEquals(config.getGracefulShutdownSleepIntervalInMills(), 22816);
+        Assert.assertEquals(config.getSleepIntervalInRebalanceRedirectMills(), 23816);
+        Assert.assertEquals(config.getEventMeshEventSize(), 22816);
+        Assert.assertEquals(config.getEventMeshEventBatchSize(), 23816);
     }
 
     private void assertCommonConfig(CommonConfiguration config) {
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java
index a26617842..1f3163221 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java
@@ -41,31 +41,31 @@ public class EventMeshTCPConfigurationTest {
     }
 
     private void assertTCPConfig(EventMeshTCPConfiguration config) {
-        Assert.assertEquals(config.eventMeshTcpServerPort, 816);
-        Assert.assertEquals(config.eventMeshTcpIdleAllSeconds, 1816);
-        Assert.assertEquals(config.eventMeshTcpIdleWriteSeconds, 2816);
-        Assert.assertEquals(config.eventMeshTcpIdleReadSeconds, 3816);
-        Assert.assertEquals(config.eventMeshTcpMsgReqnumPerSecond, Integer.valueOf(4816));
-        Assert.assertEquals(config.eventMeshTcpClientMaxNum, 5816);
-        Assert.assertEquals(config.eventMeshTcpGlobalScheduler, 6816);
-        Assert.assertEquals(config.eventMeshTcpTaskHandleExecutorPoolSize, 7816);
-        Assert.assertEquals(config.eventMeshTcpMsgDownStreamExecutorPoolSize, 8816);
-        Assert.assertEquals(config.eventMeshTcpSessionExpiredInMills, 1816);
-        Assert.assertEquals(config.eventMeshTcpSessionUpstreamBufferSize, 11816);
-        Assert.assertEquals(config.eventMeshTcpMsgAsyncRetryTimes, 12816);
-        Assert.assertEquals(config.eventMeshTcpMsgSyncRetryTimes, 13816);
-        Assert.assertEquals(config.eventMeshTcpMsgRetrySyncDelayInMills, 14816);
-        Assert.assertEquals(config.eventMeshTcpMsgRetryAsyncDelayInMills, 15816);
-        Assert.assertEquals(config.eventMeshTcpMsgRetryQueueSize, 16816);
-        Assert.assertEquals(config.eventMeshTcpRebalanceIntervalInMills, Integer.valueOf(17816));
-        Assert.assertEquals(config.eventMeshServerAdminPort, 18816);
-        Assert.assertEquals(config.eventMeshTcpSendBackEnabled, Boolean.TRUE);
-        Assert.assertEquals(config.eventMeshTcpSendBackMaxTimes, 3);
-        Assert.assertEquals(config.eventMeshTcpPushFailIsolateTimeInMills, 21816);
-        Assert.assertEquals(config.gracefulShutdownSleepIntervalInMills, 22816);
-        Assert.assertEquals(config.sleepIntervalInRebalanceRedirectMills, 23816);
-        Assert.assertEquals(config.eventMeshEventSize, 22816);
-        Assert.assertEquals(config.eventMeshEventBatchSize, 23816);
+        Assert.assertEquals(config.getEventMeshTcpServerPort(), 816);
+        Assert.assertEquals(config.getEventMeshTcpIdleAllSeconds(), 1816);
+        Assert.assertEquals(config.getEventMeshTcpIdleWriteSeconds(), 2816);
+        Assert.assertEquals(config.getEventMeshTcpIdleReadSeconds(), 3816);
+        Assert.assertEquals(config.getEventMeshTcpMsgReqnumPerSecond(), Integer.valueOf(4816));
+        Assert.assertEquals(config.getEventMeshTcpClientMaxNum(), 5816);
+        Assert.assertEquals(config.getEventMeshTcpGlobalScheduler(), 6816);
+        Assert.assertEquals(config.getEventMeshTcpTaskHandleExecutorPoolSize(), 7816);
+        Assert.assertEquals(config.getEventMeshTcpMsgDownStreamExecutorPoolSize(), 8816);
+        Assert.assertEquals(config.getEventMeshTcpSessionExpiredInMills(), 1816);
+        Assert.assertEquals(config.getEventMeshTcpSessionUpstreamBufferSize(), 11816);
+        Assert.assertEquals(config.getEventMeshTcpMsgAsyncRetryTimes(), 12816);
+        Assert.assertEquals(config.getEventMeshTcpMsgSyncRetryTimes(), 13816);
+        Assert.assertEquals(config.getEventMeshTcpMsgRetrySyncDelayInMills(), 14816);
+        Assert.assertEquals(config.getEventMeshTcpMsgRetryAsyncDelayInMills(), 15816);
+        Assert.assertEquals(config.getEventMeshTcpMsgRetryQueueSize(), 16816);
+        Assert.assertEquals(config.getEventMeshTcpRebalanceIntervalInMills(), Integer.valueOf(17816));
+        Assert.assertEquals(config.getEventMeshServerAdminPort(), 18816);
+        Assert.assertEquals(config.isEventMeshTcpSendBackEnabled(), Boolean.TRUE);
+        Assert.assertEquals(config.getEventMeshTcpSendBackMaxTimes(), 3);
+        Assert.assertEquals(config.getEventMeshTcpPushFailIsolateTimeInMills(), 21816);
+        Assert.assertEquals(config.getGracefulShutdownSleepIntervalInMills(), 22816);
+        Assert.assertEquals(config.getSleepIntervalInRebalanceRedirectMills(), 23816);
+        Assert.assertEquals(config.getEventMeshEventSize(), 22816);
+        Assert.assertEquals(config.getEventMeshEventBatchSize(), 23816);
     }
 
     private void assertCommonConfig(CommonConfiguration config) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org