You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by we...@apache.org on 2022/02/13 01:50:41 UTC
[incubator-eventmesh] branch master updated: [Issue #655] Adding send message constraints for message size and batch size (#760)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 85de5c7 [Issue #655] Adding send message constraints for message size and batch size (#760)
85de5c7 is described below
commit 85de5c7a6f0ae2533257babb7b7cdb9b8f566c3a
Author: jinrongluo <ka...@gmail.com>
AuthorDate: Sat Feb 12 20:50:36 2022 -0500
[Issue #655] Adding send message constraints for message size and batch size (#760)
* [Issue #655] Adding send message constraints for message size and batch size
---
.../tcp/demo/pub/cloudevents/AsyncPublish.java | 2 +-
eventmesh-runtime/conf/eventmesh.properties | 9 ++++++--
.../configuration/EventMeshHTTPConfiguration.java | 22 ++++++++++++++++--
.../configuration/EventMeshTCPConfiguration.java | 10 +++++++++
.../http/processor/BatchSendMessageProcessor.java | 26 ++++++++++++++++++++++
.../processor/BatchSendMessageV2Processor.java | 14 ++++++++++++
.../http/processor/ReplyMessageProcessor.java | 14 ++++++++++++
.../http/processor/SendAsyncMessageProcessor.java | 14 ++++++++++++
.../http/processor/SendSyncMessageProcessor.java | 14 ++++++++++++
.../tcp/client/task/MessageTransferTask.java | 7 ++++++
10 files changed, 127 insertions(+), 5 deletions(-)
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
index df96422..7621885 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
@@ -55,7 +55,7 @@ public class AsyncPublish {
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 2; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
logger.info("begin send async msg[{}]==================={}", i, event);
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 23425e1..84cedc9 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -39,6 +39,11 @@ eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.http.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20
+# for single event publish, maximum size allowed per event
+eventMesh.server.maxEventSize=1000
+# for batch event publish, maximum number of events allowed in one batch
+eventMesh.server.maxEventBatchSize=10
+
# thread number about global scheduler
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
@@ -61,8 +66,8 @@ eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
#ip address blacklist
-eventmesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
-eventmesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
+eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
+eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
#connector plugin
eventMesh.connector.plugin.type=standalone
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
index a71b814..c0699c9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
@@ -83,6 +83,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
public int eventMeshBatchMsgRequestNumPerSecond = 20000;
+ public int eventMeshEventSize = 1000;
+
+ public int eventMeshEventBatchSize = 10;
+
public List<IPAddress> eventMeshIpv4BlackList = Collections.emptyList();
public List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();
@@ -266,6 +270,16 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
}
+ String eventSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENTSIZE);
+ if (StringUtils.isNotEmpty(eventSize) && StringUtils.isNumeric(eventSize)) {
+ eventMeshEventSize = Integer.parseInt(eventSize);
+ }
+
+ String eventBatchSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE);
+ if (StringUtils.isNotEmpty(eventBatchSize) && StringUtils.isNumeric(eventBatchSize)) {
+ eventMeshEventBatchSize = Integer.parseInt(eventBatchSize);
+ }
+
String ipv4BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST);
if (StringUtils.isNotEmpty(ipv4BlackList)) {
eventMeshIpv4BlackList = getBlacklist(ipv4BlackList);
@@ -339,8 +353,12 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond";
- public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventmesh.server.blacklist.ipv4";
+ public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize";
+
+ public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize";
+
+ public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventMesh.server.blacklist.ipv4";
- public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventmesh.server.blacklist.ipv6";
+ public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventMesh.server.blacklist.ipv6";
}
}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
index 0ab59cb..5544b0b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
@@ -76,6 +76,10 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
public int sleepIntervalInRebalanceRedirectMills = 200;
+ public int eventMeshEventSize = 1000;
+
+ public int eventMeshEventBatchSize = 10;
+
private TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000);
private TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000);
@@ -155,6 +159,10 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
sleepIntervalInRebalanceRedirectMills = configurationWrapper.getIntProp(
ConfKeys.KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME, sleepIntervalInRebalanceRedirectMills);
+ eventMeshEventSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_EVENTSIZE, eventMeshEventSize);
+
+ eventMeshEventBatchSize = configurationWrapper.getIntProp(
+ ConfKeys.KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE, eventMeshEventBatchSize);
}
public TrafficShapingConfig getGtc() {
@@ -191,6 +199,8 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
public static String KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME = "eventMesh.server.tcp.pushFailIsolateTimeInMills";
public static String KEYS_EVENTMESH_SERVER_GRACEFUL_SHUTDOWN_SLEEP_TIME = "eventMesh.server.gracefulShutdown.sleepIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME = "eventMesh.server.rebalanceRedirect.sleepIntervalInM";
+ public static String KEYS_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize";
+ public static String KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize";
}
public static class TrafficShapingConfig {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 2f2103b..13c0605 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -111,6 +112,18 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
String producerGroup = "";
int eventSize = eventList.size();
+ if (eventSize > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize) {
+ batchMessageLogger.error("Event batch size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageBatchResponseHeader,
+ SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ "Event batch size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
for (CloudEvent event : eventList) {
//validate event
if (StringUtils.isBlank(event.getId())
@@ -126,6 +139,19 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
return;
}
+ String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+ if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+ batchMessageLogger.error("Event size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageBatchResponseHeader,
+ SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC)).toString();
String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID)).toString();
String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 9b9f44e..2b23d78 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -158,6 +159,19 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
return;
}
+ String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+ if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+ batchMessageLogger.error("Event size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
//do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index 45ec636..f35eb0e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -152,6 +153,19 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
return;
}
+ String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+ if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+ httpLogger.error("Event size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ replyMessageResponseHeader,
+ ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 8264385..07fb4df 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -45,6 +45,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -195,6 +196,19 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build();
}
+ String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+ if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+ httpLogger.error("Event size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
try {
// body
//omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index b80fec4..529d3ce 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -43,6 +43,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -191,6 +192,19 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
return;
}
+ String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+ if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+ httpLogger.error("Event size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
EventMeshProducer eventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index ffcd199..af4d26a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -41,6 +41,7 @@ import org.apache.eventmesh.runtime.util.Utils;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -86,6 +87,12 @@ public class MessageTransferTask extends AbstractTask {
throw new Exception("event is null");
}
+ String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+ if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) {
+ throw new Exception("event size exceeds the limit: "
+ + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize);
+ }
+
//do acl check in sending msg
if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org