You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/12 04:36:12 UTC
[incubator-eventmesh] branch master updated: [ISSUE #3120]Change the EventMeshTCPConfiguration property from public to private (#3121)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new af7b6fbbc [ISSUE #3120]Change the EventMeshTCPConfiguration property from public to private (#3121)
af7b6fbbc is described below
commit af7b6fbbcc0296bd3a1554184e1e0cc2e9429249
Author: mxsm <lj...@gmail.com>
AuthorDate: Sun Feb 12 12:36:06 2023 +0800
[ISSUE #3120]Change the EventMeshTCPConfiguration property from public to private (#3121)
---
.../admin/controller/ClientManageController.java | 2 +-
.../admin/handler/ConfigurationHandler.java | 2 +-
.../eventmesh/runtime/boot/EventMeshTCPServer.java | 24 ++++-----
.../configuration/EventMeshTCPConfiguration.java | 62 +++++++++++-----------
.../tcp/client/EventMeshTcpConnectionHandler.java | 2 +-
.../tcp/client/group/ClientGroupWrapper.java | 10 ++--
.../client/group/ClientSessionGroupMapping.java | 7 ++-
.../rebalance/EventMeshRebalanceService.java | 2 +-
.../client/rebalance/EventmeshRebalanceImpl.java | 2 +-
.../tcp/client/session/push/SessionPusher.java | 6 +--
.../client/session/retry/EventMeshTcpRetryer.java | 10 ++--
.../tcp/client/session/send/SessionSender.java | 2 +-
.../tcp/client/task/MessageTransferTask.java | 6 +--
.../runtime/boot/EventMeshServerTest.java | 50 ++++++++---------
.../EventMeshTCPConfigurationTest.java | 50 ++++++++---------
15 files changed, 116 insertions(+), 121 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
index ca4e8484d..4b04c9200 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
@@ -88,7 +88,7 @@ public class ClientManageController {
public void start() throws IOException {
- int port = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerAdminPort;
+ int port = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshServerAdminPort();
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
HttpHandlerManager httpHandlerManager = new HttpHandlerManager();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java
index 3324f0f0c..afd702a62 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ConfigurationHandler.java
@@ -93,7 +93,7 @@ public class ConfigurationHandler extends AbstractHttpHandler {
eventMeshTCPConfiguration.isEventMeshServerSecurityEnable(),
eventMeshTCPConfiguration.isEventMeshServerRegistryEnable(),
// TCP Configuration
- eventMeshTCPConfiguration.eventMeshTcpServerPort,
+ eventMeshTCPConfiguration.getEventMeshTcpServerPort(),
// HTTP Configuration
eventMeshHTTPConfiguration.getHttpServerPort(),
eventMeshHTTPConfiguration.isEventMeshServerUseTls(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index f4f89ada1..034662636 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -150,9 +150,9 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
.addLast(
getWorkerGroup(),
new IdleStateHandler(
- eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
- eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
- eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
+ eventMeshTCPConfiguration.getEventMeshTcpIdleReadSeconds(),
+ eventMeshTCPConfiguration.getEventMeshTcpIdleWriteSeconds(),
+ eventMeshTCPConfiguration.getEventMeshTcpIdleAllSeconds()),
new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
);
@@ -176,7 +176,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
.childHandler(channelInitializer);
try {
- int port = eventMeshTCPConfiguration.eventMeshTcpServerPort;
+ int port = eventMeshTCPConfiguration.getEventMeshTcpServerPort();
ChannelFuture f = bootstrap.bind(port).sync();
LOGGER.info("EventMeshTCPServer[port={}] started.....", port);
f.channel().closeFuture().sync();
@@ -200,7 +200,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
}
initThreadPool();
- rateLimiter = RateLimiter.create(eventMeshTCPConfiguration.eventMeshTcpMsgReqnumPerSecond);
+ rateLimiter = RateLimiter.create(eventMeshTCPConfiguration.getEventMeshTcpMsgReqnumPerSecond());
globalTrafficShapingHandler = newGTSHandler(scheduler, eventMeshTCPConfiguration.getGtc().getReadLimit());
@@ -300,7 +300,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
boolean registerResult = false;
try {
String endPoints = IPUtils.getLocalAddress()
- + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+ + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.getEventMeshTcpServerPort();
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
eventMeshRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName() + "-"
@@ -318,7 +318,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
private void unRegister() throws Exception {
String endPoints = IPUtils.getLocalAddress()
- + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+ + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.getEventMeshTcpServerPort();
EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
eventMeshUnRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName());
@@ -333,18 +333,18 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
private void initThreadPool() throws Exception {
super.init("eventMesh-tcp");
- scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler,
+ scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.getEventMeshTcpGlobalScheduler(),
new EventMeshThreadFactory("eventMesh-tcp-scheduler", true));
taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
- eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
- eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
+ eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
+ eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
new LinkedBlockingQueue<>(10_000),
new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));
broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
- eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
- eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
+ eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
+ eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
new LinkedBlockingQueue<>(10_000),
new EventMeshThreadFactory("eventMesh-tcp-msg-downstream", true));
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
index d89d043e1..e210bb276 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
@@ -31,95 +31,95 @@ import lombok.NoArgsConstructor;
public class EventMeshTCPConfiguration extends CommonConfiguration {
@ConfigFiled(field = "tcp.port")
- public int eventMeshTcpServerPort = 10000;
+ private int eventMeshTcpServerPort = 10000;
@ConfigFiled(field = "tcp.allIdleSeconds")
- public int eventMeshTcpIdleAllSeconds = 60;
+ private int eventMeshTcpIdleAllSeconds = 60;
@ConfigFiled(field = "tcp.writerIdleSeconds")
- public int eventMeshTcpIdleWriteSeconds = 60;
+ private int eventMeshTcpIdleWriteSeconds = 60;
@ConfigFiled(field = "tcp.readerIdleSeconds")
- public int eventMeshTcpIdleReadSeconds = 60;
+ private int eventMeshTcpIdleReadSeconds = 60;
@ConfigFiled(field = "tcp.msgReqnumPerSecond")
- public Integer eventMeshTcpMsgReqnumPerSecond = 15000;
+ private Integer eventMeshTcpMsgReqnumPerSecond = 15000;
/**
* TCP Server allows max client num
*/
@ConfigFiled(field = "tcp.clientMaxNum")
- public int eventMeshTcpClientMaxNum = 10000;
+ private int eventMeshTcpClientMaxNum = 10000;
//======================================= New add config =================================
@ConfigFiled(field = "global.scheduler")
- public int eventMeshTcpGlobalScheduler = 5;
+ private int eventMeshTcpGlobalScheduler = 5;
@ConfigFiled(field = "tcp.taskHandleExecutorPoolSize")
- public int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
+ private int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
@ConfigFiled(field = "tcp.msgDownStreamExecutorPoolSize")
- public int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);
+ private int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);
@ConfigFiled(field = "session.expiredInMills")
- public int eventMeshTcpSessionExpiredInMills = 60000;
+ private int eventMeshTcpSessionExpiredInMills = 60000;
@ConfigFiled(field = "session.upstreamBufferSize")
- public int eventMeshTcpSessionUpstreamBufferSize = 100;
+ private int eventMeshTcpSessionUpstreamBufferSize = 100;
@ConfigFiled(field = "retry.async.pushRetryTimes")
- public int eventMeshTcpMsgAsyncRetryTimes = 3;
+ private int eventMeshTcpMsgAsyncRetryTimes = 3;
@ConfigFiled(field = "retry.sync.pushRetryTimes")
- public int eventMeshTcpMsgSyncRetryTimes = 1;
+ private int eventMeshTcpMsgSyncRetryTimes = 1;
@ConfigFiled(field = "retry.sync.pushRetryDelayInMills")
- public int eventMeshTcpMsgRetrySyncDelayInMills = 500;
+ private int eventMeshTcpMsgRetrySyncDelayInMills = 500;
@ConfigFiled(field = "retry.async.pushRetryDelayInMills")
- public int eventMeshTcpMsgRetryAsyncDelayInMills = 500;
+ private int eventMeshTcpMsgRetryAsyncDelayInMills = 500;
@ConfigFiled(field = "retry.pushRetryQueueSize")
- public int eventMeshTcpMsgRetryQueueSize = 10000;
+ private int eventMeshTcpMsgRetryQueueSize = 10000;
@ConfigFiled(field = "tcp.RebalanceIntervalInMills")
- public Integer eventMeshTcpRebalanceIntervalInMills = 30 * 1000;
+ private Integer eventMeshTcpRebalanceIntervalInMills = 30 * 1000;
@ConfigFiled(field = "admin.http.port")
- public int eventMeshServerAdminPort = 10106;
+ private int eventMeshServerAdminPort = 10106;
@ConfigFiled(field = "tcp.sendBack.enabled")
- public boolean eventMeshTcpSendBackEnabled = Boolean.TRUE;
+ private boolean eventMeshTcpSendBackEnabled = Boolean.TRUE;
- @ConfigFiled(field = "")
- public int eventMeshTcpSendBackMaxTimes = 3;
+ @ConfigFiled(field = "tcp.SendBackMaxTimes")
+ private int eventMeshTcpSendBackMaxTimes = 3;
@ConfigFiled(field = "tcp.pushFailIsolateTimeInMills")
- public int eventMeshTcpPushFailIsolateTimeInMills = 30 * 1000;
+ private int eventMeshTcpPushFailIsolateTimeInMills = 30 * 1000;
@ConfigFiled(field = "gracefulShutdown.sleepIntervalInMills")
- public int gracefulShutdownSleepIntervalInMills = 1000;
+ private int gracefulShutdownSleepIntervalInMills = 1000;
@ConfigFiled(field = "rebalanceRedirect.sleepIntervalInM")
- public int sleepIntervalInRebalanceRedirectMills = 200;
+ private int sleepIntervalInRebalanceRedirectMills = 200;
@ConfigFiled(field = "maxEventSize")
- public int eventMeshEventSize = 1000;
+ private int eventMeshEventSize = 1000;
@ConfigFiled(field = "maxEventBatchSize")
- public int eventMeshEventBatchSize = 10;
+ private int eventMeshEventBatchSize = 10;
- private final TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000);
+ private final TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2_000);
private final TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000);
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class TrafficShapingConfig {
- long writeLimit = 0;
- long readLimit = 1000;
- long checkInterval = 1000;
- long maxTime = 5000;
+ private long writeLimit = 0;
+ private long readLimit = 1000;
+ private long checkInterval = 1000;
+ private long maxTime = 5000;
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
index 4b3a10052..fe2f45f88 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
@@ -60,7 +60,7 @@ public class EventMeshTcpConnectionHandler extends ChannelDuplexHandler {
logger.info("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "");
int c = connections.incrementAndGet();
- if (c > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpClientMaxNum) {
+ if (c > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpClientMaxNum()) {
logger.warn("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client connect "
+
"this eventMesh server");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 245cf311e..c4f8de7ca 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -486,13 +486,9 @@ public class ClientGroupWrapper {
group, topic, bizSeqNo, sendBackTimes,
sendBackFromEventMeshIp);
- if (Objects.requireNonNull(sendBackTimes) >= eventMeshTCPServer
- .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
- log.error(
- "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
- + "bizSeqNo:{}", eventMeshTCPServer
- .getEventMeshTCPConfiguration()
- .eventMeshTcpSendBackMaxTimes,
+ int eventMeshTcpSendBackMaxTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSendBackMaxTimes();
+ if (Objects.requireNonNull(sendBackTimes) >= eventMeshTcpSendBackMaxTimes) {
+ log.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, " + "bizSeqNo:{}", eventMeshTcpSendBackMaxTimes,
group, topic, bizSeqNo);
} else {
sendBackTimes++;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 2ebc2aa64..b9c6547f8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -361,7 +361,7 @@ public class ClientSessionGroupMapping {
() -> {
for (Session tmp : sessionTable.values()) {
if (System.currentTimeMillis() - tmp.getLastHeartbeatTime()
- > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
+ > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSessionExpiredInMills()) {
try {
if (log.isWarnEnabled()) {
log.warn("clean expired session,client:{}", tmp.getClient());
@@ -372,8 +372,7 @@ public class ClientSessionGroupMapping {
}
}
}
- }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills,
- TimeUnit.MILLISECONDS);
+ }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSessionExpiredInMills(), TimeUnit.MILLISECONDS);
}
private void initDownStreamMsgContextCleaner() {
@@ -427,7 +426,7 @@ public class ClientSessionGroupMapping {
}
}
try {
- Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().gracefulShutdownSleepIntervalInMills);
+ Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().getGracefulShutdownSleepIntervalInMills());
} catch (InterruptedException e) {
log.warn("Thread.sleep occur InterruptedException", e);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
index 48406c38a..20f087a07 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java
@@ -43,7 +43,7 @@ public class EventMeshRebalanceService {
public EventMeshRebalanceService(EventMeshTCPServer eventMeshTCPServer, EventMeshRebalanceStrategy rebalanceStrategy) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.rebalanceStrategy = rebalanceStrategy;
- this.rebalanceIntervalMills = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpRebalanceIntervalInMills;
+ this.rebalanceIntervalMills = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpRebalanceIntervalInMills();
}
public void init() {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
index 26405f8ec..ad51bcb32 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
@@ -171,7 +171,7 @@ public class EventmeshRebalanceImpl implements EventMeshRebalanceStrategy {
Integer.parseInt(newProxyPort), sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
logger.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
try {
- Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
+ Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().getSleepIntervalInRebalanceRedirectMills());
} catch (InterruptedException e) {
logger.warn("Thread.sleep occur InterruptedException", e);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index ba30abaec..34942b983 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -129,14 +129,14 @@ public class SessionPusher {
//how long to isolate client when push fail
long isolateTime = System.currentTimeMillis()
- + session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills;
+ + session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
session.setIsolateTime(isolateTime);
logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);
//retry
long delayTime = SubscriptionType.SYNC == downStreamMsgContext.subscriptionItem.getType()
- ? session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetrySyncDelayInMills
- : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryAsyncDelayInMills;
+ ? session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
+ : session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
downStreamMsgContext.delay(delayTime);
Objects.requireNonNull(session.getClientGroupWrapper().get()).getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
} else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
index 8f6d2e8ad..8efa9d71d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
@@ -61,19 +61,19 @@ public class EventMeshTcpRetryer {
}
public void pushRetry(RetryContext retryContext) {
- if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
+ if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize()) {
logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, retryContext.retryTimes,
+ eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize(), retryContext.retryTimes,
retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.event));
return;
}
- int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+ int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
if (retryContext instanceof DownStreamMsgContext) {
DownStreamMsgContext downStreamMsgContext = (DownStreamMsgContext) retryContext;
maxRetryTimes = SubscriptionType.SYNC == downStreamMsgContext.subscriptionItem.getType()
- ? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes :
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+ ? eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgSyncRetryTimes() :
+ eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
}
if (retryContext.retryTimes >= maxRetryTimes) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
index 898f08cca..b5b7500f7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
@@ -83,7 +83,7 @@ public class SessionSender {
public SessionSender(Session session) {
this.session = session;
- this.upstreamBuff = new Semaphore(session.getEventMeshTCPConfiguration().eventMeshTcpSessionUpstreamBufferSize);
+ this.upstreamBuff = new Semaphore(session.getEventMeshTCPConfiguration().getEventMeshTcpSessionUpstreamBufferSize());
}
public EventMeshTcpSendResult send(Header header, CloudEvent event, SendCallback sendCallback, long startTime,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index ab35a727d..0c13db839 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -114,9 +114,9 @@ public class MessageTransferTask extends AbstractTask {
}
String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
- if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) {
- throw new Exception("event size exceeds the limit: "
- + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize);
+ int eventMeshEventSize = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshEventSize();
+ if (content.length() > eventMeshEventSize) {
+ throw new Exception("event size exceeds the limit: " + eventMeshEventSize);
}
//do acl check in sending msg
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java
index 84aedc1fd..f038810d5 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java
@@ -60,31 +60,31 @@ public class EventMeshServerTest {
}
private void assertTCPConfig(EventMeshTCPConfiguration config) {
- Assert.assertEquals(config.eventMeshTcpServerPort, 816);
- Assert.assertEquals(config.eventMeshTcpIdleAllSeconds, 1816);
- Assert.assertEquals(config.eventMeshTcpIdleWriteSeconds, 2816);
- Assert.assertEquals(config.eventMeshTcpIdleReadSeconds, 3816);
- Assert.assertEquals(config.eventMeshTcpMsgReqnumPerSecond, Integer.valueOf(4816));
- Assert.assertEquals(config.eventMeshTcpClientMaxNum, 5816);
- Assert.assertEquals(config.eventMeshTcpGlobalScheduler, 6816);
- Assert.assertEquals(config.eventMeshTcpTaskHandleExecutorPoolSize, 7816);
- Assert.assertEquals(config.eventMeshTcpMsgDownStreamExecutorPoolSize, 8816);
- Assert.assertEquals(config.eventMeshTcpSessionExpiredInMills, 1816);
- Assert.assertEquals(config.eventMeshTcpSessionUpstreamBufferSize, 11816);
- Assert.assertEquals(config.eventMeshTcpMsgAsyncRetryTimes, 12816);
- Assert.assertEquals(config.eventMeshTcpMsgSyncRetryTimes, 13816);
- Assert.assertEquals(config.eventMeshTcpMsgRetrySyncDelayInMills, 14816);
- Assert.assertEquals(config.eventMeshTcpMsgRetryAsyncDelayInMills, 15816);
- Assert.assertEquals(config.eventMeshTcpMsgRetryQueueSize, 16816);
- Assert.assertEquals(config.eventMeshTcpRebalanceIntervalInMills, Integer.valueOf(17816));
- Assert.assertEquals(config.eventMeshServerAdminPort, 18816);
- Assert.assertEquals(config.eventMeshTcpSendBackEnabled, Boolean.TRUE);
- Assert.assertEquals(config.eventMeshTcpSendBackMaxTimes, 3);
- Assert.assertEquals(config.eventMeshTcpPushFailIsolateTimeInMills, 21816);
- Assert.assertEquals(config.gracefulShutdownSleepIntervalInMills, 22816);
- Assert.assertEquals(config.sleepIntervalInRebalanceRedirectMills, 23816);
- Assert.assertEquals(config.eventMeshEventSize, 22816);
- Assert.assertEquals(config.eventMeshEventBatchSize, 23816);
+ Assert.assertEquals(config.getEventMeshTcpServerPort(), 816);
+ Assert.assertEquals(config.getEventMeshTcpIdleAllSeconds(), 1816);
+ Assert.assertEquals(config.getEventMeshTcpIdleWriteSeconds(), 2816);
+ Assert.assertEquals(config.getEventMeshTcpIdleReadSeconds(), 3816);
+ Assert.assertEquals(config.getEventMeshTcpMsgReqnumPerSecond(), Integer.valueOf(4816));
+ Assert.assertEquals(config.getEventMeshTcpClientMaxNum(), 5816);
+ Assert.assertEquals(config.getEventMeshTcpGlobalScheduler(), 6816);
+ Assert.assertEquals(config.getEventMeshTcpTaskHandleExecutorPoolSize(), 7816);
+ Assert.assertEquals(config.getEventMeshTcpMsgDownStreamExecutorPoolSize(), 8816);
+ Assert.assertEquals(config.getEventMeshTcpSessionExpiredInMills(), 1816);
+ Assert.assertEquals(config.getEventMeshTcpSessionUpstreamBufferSize(), 11816);
+ Assert.assertEquals(config.getEventMeshTcpMsgAsyncRetryTimes(), 12816);
+ Assert.assertEquals(config.getEventMeshTcpMsgSyncRetryTimes(), 13816);
+ Assert.assertEquals(config.getEventMeshTcpMsgRetrySyncDelayInMills(), 14816);
+ Assert.assertEquals(config.getEventMeshTcpMsgRetryAsyncDelayInMills(), 15816);
+ Assert.assertEquals(config.getEventMeshTcpMsgRetryQueueSize(), 16816);
+ Assert.assertEquals(config.getEventMeshTcpRebalanceIntervalInMills(), Integer.valueOf(17816));
+ Assert.assertEquals(config.getEventMeshServerAdminPort(), 18816);
+ Assert.assertEquals(config.isEventMeshTcpSendBackEnabled(), Boolean.TRUE);
+ Assert.assertEquals(config.getEventMeshTcpSendBackMaxTimes(), 3);
+ Assert.assertEquals(config.getEventMeshTcpPushFailIsolateTimeInMills(), 21816);
+ Assert.assertEquals(config.getGracefulShutdownSleepIntervalInMills(), 22816);
+ Assert.assertEquals(config.getSleepIntervalInRebalanceRedirectMills(), 23816);
+ Assert.assertEquals(config.getEventMeshEventSize(), 22816);
+ Assert.assertEquals(config.getEventMeshEventBatchSize(), 23816);
}
private void assertCommonConfig(CommonConfiguration config) {
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java
index a26617842..1f3163221 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java
@@ -41,31 +41,31 @@ public class EventMeshTCPConfigurationTest {
}
private void assertTCPConfig(EventMeshTCPConfiguration config) {
- Assert.assertEquals(config.eventMeshTcpServerPort, 816);
- Assert.assertEquals(config.eventMeshTcpIdleAllSeconds, 1816);
- Assert.assertEquals(config.eventMeshTcpIdleWriteSeconds, 2816);
- Assert.assertEquals(config.eventMeshTcpIdleReadSeconds, 3816);
- Assert.assertEquals(config.eventMeshTcpMsgReqnumPerSecond, Integer.valueOf(4816));
- Assert.assertEquals(config.eventMeshTcpClientMaxNum, 5816);
- Assert.assertEquals(config.eventMeshTcpGlobalScheduler, 6816);
- Assert.assertEquals(config.eventMeshTcpTaskHandleExecutorPoolSize, 7816);
- Assert.assertEquals(config.eventMeshTcpMsgDownStreamExecutorPoolSize, 8816);
- Assert.assertEquals(config.eventMeshTcpSessionExpiredInMills, 1816);
- Assert.assertEquals(config.eventMeshTcpSessionUpstreamBufferSize, 11816);
- Assert.assertEquals(config.eventMeshTcpMsgAsyncRetryTimes, 12816);
- Assert.assertEquals(config.eventMeshTcpMsgSyncRetryTimes, 13816);
- Assert.assertEquals(config.eventMeshTcpMsgRetrySyncDelayInMills, 14816);
- Assert.assertEquals(config.eventMeshTcpMsgRetryAsyncDelayInMills, 15816);
- Assert.assertEquals(config.eventMeshTcpMsgRetryQueueSize, 16816);
- Assert.assertEquals(config.eventMeshTcpRebalanceIntervalInMills, Integer.valueOf(17816));
- Assert.assertEquals(config.eventMeshServerAdminPort, 18816);
- Assert.assertEquals(config.eventMeshTcpSendBackEnabled, Boolean.TRUE);
- Assert.assertEquals(config.eventMeshTcpSendBackMaxTimes, 3);
- Assert.assertEquals(config.eventMeshTcpPushFailIsolateTimeInMills, 21816);
- Assert.assertEquals(config.gracefulShutdownSleepIntervalInMills, 22816);
- Assert.assertEquals(config.sleepIntervalInRebalanceRedirectMills, 23816);
- Assert.assertEquals(config.eventMeshEventSize, 22816);
- Assert.assertEquals(config.eventMeshEventBatchSize, 23816);
+ Assert.assertEquals(config.getEventMeshTcpServerPort(), 816);
+ Assert.assertEquals(config.getEventMeshTcpIdleAllSeconds(), 1816);
+ Assert.assertEquals(config.getEventMeshTcpIdleWriteSeconds(), 2816);
+ Assert.assertEquals(config.getEventMeshTcpIdleReadSeconds(), 3816);
+ Assert.assertEquals(config.getEventMeshTcpMsgReqnumPerSecond(), Integer.valueOf(4816));
+ Assert.assertEquals(config.getEventMeshTcpClientMaxNum(), 5816);
+ Assert.assertEquals(config.getEventMeshTcpGlobalScheduler(), 6816);
+ Assert.assertEquals(config.getEventMeshTcpTaskHandleExecutorPoolSize(), 7816);
+ Assert.assertEquals(config.getEventMeshTcpMsgDownStreamExecutorPoolSize(), 8816);
+ Assert.assertEquals(config.getEventMeshTcpSessionExpiredInMills(), 1816);
+ Assert.assertEquals(config.getEventMeshTcpSessionUpstreamBufferSize(), 11816);
+ Assert.assertEquals(config.getEventMeshTcpMsgAsyncRetryTimes(), 12816);
+ Assert.assertEquals(config.getEventMeshTcpMsgSyncRetryTimes(), 13816);
+ Assert.assertEquals(config.getEventMeshTcpMsgRetrySyncDelayInMills(), 14816);
+ Assert.assertEquals(config.getEventMeshTcpMsgRetryAsyncDelayInMills(), 15816);
+ Assert.assertEquals(config.getEventMeshTcpMsgRetryQueueSize(), 16816);
+ Assert.assertEquals(config.getEventMeshTcpRebalanceIntervalInMills(), Integer.valueOf(17816));
+ Assert.assertEquals(config.getEventMeshServerAdminPort(), 18816);
+ Assert.assertEquals(config.isEventMeshTcpSendBackEnabled(), Boolean.TRUE);
+ Assert.assertEquals(config.getEventMeshTcpSendBackMaxTimes(), 3);
+ Assert.assertEquals(config.getEventMeshTcpPushFailIsolateTimeInMills(), 21816);
+ Assert.assertEquals(config.getGracefulShutdownSleepIntervalInMills(), 22816);
+ Assert.assertEquals(config.getSleepIntervalInRebalanceRedirectMills(), 23816);
+ Assert.assertEquals(config.getEventMeshEventSize(), 22816);
+ Assert.assertEquals(config.getEventMeshEventBatchSize(), 23816);
}
private void assertCommonConfig(CommonConfiguration config) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org