You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/01/04 08:01:42 UTC

[incubator-eventmesh] branch master updated: Simplify the code (#2802)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new f944132ff Simplify the code (#2802)
f944132ff is described below

commit f944132ff8e787e267e6cc5bffd7dc0b4c55ce40
Author: weihubeats <we...@163.com>
AuthorDate: Wed Jan 4 16:01:36 2023 +0800

    Simplify the code (#2802)
---
 .../http/processor/SendAsyncMessageProcessor.java  | 104 ++++++++++-----------
 1 file changed, 48 insertions(+), 56 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 8a607a42b..86d12dd86 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -34,6 +34,7 @@ import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
@@ -59,6 +60,9 @@ import io.cloudevents.core.builder.CloudEventBuilder;
 import io.netty.channel.ChannelHandlerContext;
 import io.opentelemetry.api.trace.Span;
 
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
 public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
     public Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
@@ -69,47 +73,43 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
     public Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
 
-    private EventMeshHTTPServer eventMeshHTTPServer;
-
-    public SendAsyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
-        this.eventMeshHTTPServer = eventMeshHTTPServer;
-    }
+    private final EventMeshHTTPServer eventMeshHTTPServer;
 
     @Override
     public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
 
         HttpCommand responseEventMeshCommand;
-
+        String localAddress = IPUtils.getLocalAddress();
+        HttpCommand request = asyncContext.getRequest();
         cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(
-                Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+                Integer.valueOf(request.getRequestCode())),
             EventMeshConstants.PROTOCOL_HTTP,
-            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress());
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
 
-        SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
+        SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) request.getHeader();
 
+        EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
         SendMessageResponseHeader sendMessageResponseHeader =
-            SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
-                    eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster(),
-                IPUtils.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv(),
-                    eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
+            SendMessageResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
+                    eventMeshHttpConfiguration.getEventMeshCluster(),
+                localAddress, eventMeshHttpConfiguration.getEventMeshEnv(),
+                    eventMeshHttpConfiguration.getEventMeshIDC());
 
         String protocolType = sendMessageRequestHeader.getProtocolType();
         String protocolVersin = sendMessageRequestHeader.getProtocolVersion();
         ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
                 ProtocolPluginFactory.getProtocolAdaptor(protocolType);
-        CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
+        CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(request);
 
         Span span = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event),
             EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, true);
 
         //validate event
         if (event == null
-            || StringUtils.isBlank(event.getId())
             || event.getSource() == null
             || event.getSpecVersion() == null
-            || StringUtils.isBlank(event.getType())
-            || StringUtils.isBlank(event.getSubject())) {
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+            || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
@@ -127,11 +127,9 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
 
         //validate event-extension
-        if (StringUtils.isBlank(idc)
-            || StringUtils.isBlank(pid)
-            || !StringUtils.isNumeric(pid)
-            || StringUtils.isBlank(sys)) {
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+        if (StringUtils.isAnyBlank(idc, pid, sys)
+            || !StringUtils.isNumeric(pid)) {
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
@@ -150,12 +148,9 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         String topic = event.getSubject();
 
         //validate body
-        if (StringUtils.isBlank(bizNo)
-            || StringUtils.isBlank(uniqueId)
-            || StringUtils.isBlank(producerGroup)
-            || StringUtils.isBlank(topic)
+        if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
             || event.getData() == null) {
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
@@ -169,16 +164,16 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         }
 
         //do acl check
-        if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
+        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
             String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
             String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
             String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
             String subsystem = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
-            int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
+            int requestCode = Integer.parseInt(request.getRequestCode());
             try {
                 Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
             } catch (Exception e) {
-                responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                responseEventMeshCommand = request.createHttpCommandResponse(
                     sendMessageResponseHeader,
                     SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
                 asyncContext.onComplete(responseEventMeshCommand);
@@ -195,7 +190,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         // control flow rate limit
         if (!eventMeshHTTPServer.getMsgRateLimiter()
             .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
@@ -212,7 +207,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
         if (!eventMeshProducer.getStarted().get()) {
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg()));
@@ -233,14 +228,14 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         }
 
         String content = event.getData() == null ? "" : new String(event.getData().toBytes(), StandardCharsets.UTF_8);
-        if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+        if (content.length() > eventMeshHttpConfiguration.eventMeshEventSize) {
             httpLogger.error("Event size exceeds the limit: {}",
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+                eventMeshHttpConfiguration.eventMeshEventSize);
 
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR.getRetCode(),
-                    "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+                    "Event size exceeds the limit: " + eventMeshHttpConfiguration.eventMeshEventSize));
             asyncContext.onComplete(responseEventMeshCommand);
 
             Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event),
@@ -253,9 +248,9 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
         try {
             event = CloudEventBuilder.from(event)
                 .withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
-                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, asyncContext.getRequest().reqTime)
+                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, request.reqTime)
                 .withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
-                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+                        eventMeshHttpConfiguration.getEventMeshServerIp())
                 .build();
 
             if (messageLogger.isDebugEnabled()) {
@@ -263,7 +258,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
             }
         } catch (Exception e) {
             messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
-            responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+            responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
@@ -282,20 +277,17 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
         long startTime = System.currentTimeMillis();
 
-        final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
-            @Override
-            public void onResponse(HttpCommand httpCommand) {
-                try {
-                    if (httpLogger.isDebugEnabled()) {
-                        httpLogger.debug("{}", httpCommand);
-                    }
-                    eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
-
-                    eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPReqResTimeCost(
-                        System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
-                } catch (Exception ex) {
-                    //ignore
+        final CompleteHandler<HttpCommand> handler = httpCommand -> {
+            try {
+                if (httpLogger.isDebugEnabled()) {
+                    httpLogger.debug("{}", httpCommand);
                 }
+                eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
+
+                eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPReqResTimeCost(
+                    System.currentTimeMillis() - request.getReqTime());
+            } catch (Exception ex) {
+                //ignore
             }
         };
 
@@ -313,7 +305,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
                     @Override
                     public void onSuccess(SendResult sendResult) {
-                        HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
+                        HttpCommand succ = request.createHttpCommandResponse(
                             sendMessageResponseHeader,
                             SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
                                 EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString()));
@@ -328,7 +320,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
                     @Override
                     public void onException(OnExceptionContext context) {
-                        HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+                        HttpCommand err = request.createHttpCommandResponse(
                             sendMessageResponseHeader,
                             SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
                                 EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
@@ -353,7 +345,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
 
         } catch (Exception ex) {
-            HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+            HttpCommand err = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
                     EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org