You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by we...@apache.org on 2022/02/13 01:50:41 UTC

[incubator-eventmesh] branch master updated: [Issue #655] Adding send message constraints for message size and batch size (#760)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 85de5c7  [Issue #655] Adding send message constraints for message size and batch size (#760)
85de5c7 is described below

commit 85de5c7a6f0ae2533257babb7b7cdb9b8f566c3a
Author: jinrongluo <ka...@gmail.com>
AuthorDate: Sat Feb 12 20:50:36 2022 -0500

    [Issue #655] Adding send message constraints for message size and batch size (#760)
    
    * [Issue #655] Adding send message constraints for message size and batch size
---
 .../tcp/demo/pub/cloudevents/AsyncPublish.java     |  2 +-
 eventmesh-runtime/conf/eventmesh.properties        |  9 ++++++--
 .../configuration/EventMeshHTTPConfiguration.java  | 22 ++++++++++++++++--
 .../configuration/EventMeshTCPConfiguration.java   | 10 +++++++++
 .../http/processor/BatchSendMessageProcessor.java  | 26 ++++++++++++++++++++++
 .../processor/BatchSendMessageV2Processor.java     | 14 ++++++++++++
 .../http/processor/ReplyMessageProcessor.java      | 14 ++++++++++++
 .../http/processor/SendAsyncMessageProcessor.java  | 14 ++++++++++++
 .../http/processor/SendSyncMessageProcessor.java   | 14 ++++++++++++
 .../tcp/client/task/MessageTransferTask.java       |  7 ++++++
 10 files changed, 127 insertions(+), 5 deletions(-)

diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
index df96422..7621885 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
@@ -55,7 +55,7 @@ public class AsyncPublish {
                     EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
             client.init();
 
-            for (int i = 0; i < 5; i++) {
+            for (int i = 0; i < 2; i++) {
                 CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
                 logger.info("begin send async msg[{}]==================={}", i, event);
                 client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 23425e1..84cedc9 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -39,6 +39,11 @@ eventMesh.server.tcp.msgReqnumPerSecond=15000
 eventMesh.server.http.msgReqnumPerSecond=15000
 eventMesh.server.session.upstreamBufferSize=20
 
+# for single event publish, maximum size allowed per event
+eventMesh.server.maxEventSize=1000
+# for batch event publish, maximum number of events allowed in one batch
+eventMesh.server.maxEventBatchSize=10
+
 # thread number about global scheduler
 eventMesh.server.global.scheduler=5
 eventMesh.server.tcp.taskHandleExecutorPoolSize=8
@@ -61,8 +66,8 @@ eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
 eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
 
 #ip address blacklist
-eventmesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
-eventmesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
+eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
+eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
 
 #connector plugin
 eventMesh.connector.plugin.type=standalone
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
index a71b814..c0699c9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
@@ -83,6 +83,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
 
     public int eventMeshBatchMsgRequestNumPerSecond = 20000;
 
+    public int eventMeshEventSize = 1000;
+
+    public int eventMeshEventBatchSize = 10;
+
     public List<IPAddress> eventMeshIpv4BlackList = Collections.emptyList();
 
     public List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();
@@ -266,6 +270,16 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
 
             }
 
+            String eventSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENTSIZE);
+            if (StringUtils.isNotEmpty(eventSize) && StringUtils.isNumeric(eventSize)) {
+                eventMeshEventSize = Integer.parseInt(eventSize);
+            }
+
+            String eventBatchSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE);
+            if (StringUtils.isNotEmpty(eventBatchSize) && StringUtils.isNumeric(eventBatchSize)) {
+                eventMeshEventBatchSize = Integer.parseInt(eventBatchSize);
+            }
+
             String ipv4BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST);
             if (StringUtils.isNotEmpty(ipv4BlackList)) {
                 eventMeshIpv4BlackList = getBlacklist(ipv4BlackList);
@@ -339,8 +353,12 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
 
         public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond";
 
-        public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventmesh.server.blacklist.ipv4";
+        public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize";
+
+        public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize";
+
+        public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventMesh.server.blacklist.ipv4";
 
-        public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventmesh.server.blacklist.ipv6";
+        public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventMesh.server.blacklist.ipv6";
     }
 }
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
index 0ab59cb..5544b0b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java
@@ -76,6 +76,10 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
 
     public int sleepIntervalInRebalanceRedirectMills = 200;
 
+    public int eventMeshEventSize = 1000;
+
+    public int eventMeshEventBatchSize = 10;
+
     private TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000);
     private TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000);
 
@@ -155,6 +159,10 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
         sleepIntervalInRebalanceRedirectMills = configurationWrapper.getIntProp(
                 ConfKeys.KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME, sleepIntervalInRebalanceRedirectMills);
 
+        eventMeshEventSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_EVENTSIZE, eventMeshEventSize);
+
+        eventMeshEventBatchSize = configurationWrapper.getIntProp(
+                ConfKeys.KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE, eventMeshEventBatchSize);
     }
 
     public TrafficShapingConfig getGtc() {
@@ -191,6 +199,8 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
         public static String KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME = "eventMesh.server.tcp.pushFailIsolateTimeInMills";
         public static String KEYS_EVENTMESH_SERVER_GRACEFUL_SHUTDOWN_SLEEP_TIME = "eventMesh.server.gracefulShutdown.sleepIntervalInMills";
         public static String KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME = "eventMesh.server.rebalanceRedirect.sleepIntervalInM";
+        public static String KEYS_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize";
+        public static String KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize";
     }
 
     public static class TrafficShapingConfig {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 2f2103b..13c0605 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -111,6 +112,18 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
         String producerGroup = "";
         int eventSize = eventList.size();
 
+        if (eventSize > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize) {
+            batchMessageLogger.error("Event batch size exceeds the limit: {}",
+                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize);
+
+            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                sendMessageBatchResponseHeader,
+                SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                    "Event batch size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize));
+            asyncContext.onComplete(responseEventMeshCommand);
+            return;
+        }
+
         for (CloudEvent event : eventList) {
             //validate event
             if (StringUtils.isBlank(event.getId())
@@ -126,6 +139,19 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
                 return;
             }
 
+            String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+            if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+                batchMessageLogger.error("Event size exceeds the limit: {}",
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+                responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                    sendMessageBatchResponseHeader,
+                    SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+                        "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+                asyncContext.onComplete(responseEventMeshCommand);
+                return;
+            }
+
             String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC)).toString();
             String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID)).toString();
             String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 9b9f44e..2b23d78 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -158,6 +159,19 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
             return;
         }
 
+        String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+        if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+            batchMessageLogger.error("Event size exceeds the limit: {}",
+                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                sendMessageBatchV2ResponseHeader,
+                SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                    "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+            asyncContext.onComplete(responseEventMeshCommand);
+            return;
+        }
+
         //do acl check
         if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
             String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index 45ec636..f35eb0e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -152,6 +153,19 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
+        String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+        if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+            httpLogger.error("Event size exceeds the limit: {}",
+                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                replyMessageResponseHeader,
+                ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                    "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+            asyncContext.onComplete(responseEventMeshCommand);
+            return;
+        }
+
         EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
         if (!eventMeshProducer.getStarted().get()) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 8264385..07fb4df 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -45,6 +45,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -195,6 +196,19 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
             event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build();
         }
 
+        String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+        if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+            httpLogger.error("Event size exceeds the limit: {}",
+                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                sendMessageResponseHeader,
+                SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                    "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+            asyncContext.onComplete(responseEventMeshCommand);
+            return;
+        }
+
         try {
             // body
             //omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index b80fec4..529d3ce 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -43,6 +43,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -191,6 +192,19 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
+        String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+        if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+            httpLogger.error("Event size exceeds the limit: {}",
+                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+
+            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                sendMessageResponseHeader,
+                SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                    "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+            asyncContext.onComplete(responseEventMeshCommand);
+            return;
+        }
+
         EventMeshProducer eventMeshProducer =
                 eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index ffcd199..af4d26a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -41,6 +41,7 @@ import org.apache.eventmesh.runtime.util.Utils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -86,6 +87,12 @@ public class MessageTransferTask extends AbstractTask {
                 throw new Exception("event is null");
             }
 
+            String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
+            if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) {
+                throw new Exception("event size exceeds the limit: "
+                    + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize);
+            }
+
             //do acl check in sending msg
             if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) {
                 String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org