You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/09 01:42:17 UTC

[incubator-eventmesh] branch master updated: fix issue2674

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 57a596c64 fix issue2674
     new 078142233 Merge pull request #2825 from jonyangx/issue2674
57a596c64 is described below

commit 57a596c64b94dca520387910371667e8704871cc
Author: jonyangx <ya...@gmail.com>
AuthorDate: Thu Jan 5 08:06:28 2023 +0800

    fix issue2674
---
 .../http/processor/SendSyncMessageProcessor.java   | 186 +++++++++++----------
 1 file changed, 101 insertions(+), 85 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 064553488..6b5deb4c7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.processor;
 
+
 import org.apache.eventmesh.api.RequestReplyCallback;
 import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
 import org.apache.eventmesh.common.protocol.http.HttpCommand;
@@ -47,49 +48,44 @@ import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
 import io.netty.channel.ChannelHandlerContext;
 
-public class SendSyncMessageProcessor implements HttpRequestProcessor {
-
-    public Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
-
-    public Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD);
-
-    public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
-
-    public Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
+import lombok.extern.slf4j.Slf4j;
 
-    private EventMeshHTTPServer eventMeshHTTPServer;
+@Slf4j
+public class SendSyncMessageProcessor implements HttpRequestProcessor {
+    private transient EventMeshHTTPServer eventMeshHTTPServer;
 
-    public SendSyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
+    public SendSyncMessageProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
         this.eventMeshHTTPServer = eventMeshHTTPServer;
     }
 
     @Override
-    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+    public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext)
             throws Exception {
 
         HttpCommand responseEventMeshCommand;
 
-        cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
-                RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
-                EventMeshConstants.PROTOCOL_HTTP,
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress());
+        final String localAddress = IPUtils.getLocalAddress();
+
+        if (log.isInfoEnabled()) {
+            log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+                    RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+                    EventMeshConstants.PROTOCOL_HTTP,
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
+        }
 
-        ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
+        final ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
                 ProtocolPluginFactory.getProtocolAdaptor("cloudevents");
-        CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
+        final CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
 
-        SendMessageResponseHeader sendMessageResponseHeader =
+        final SendMessageResponseHeader sendMessageResponseHeader =
                 SendMessageResponseHeader
                         .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
                                 eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster(),
-                                IPUtils.getLocalAddress(),
+                                localAddress,
                                 eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv(),
                                 eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
 
@@ -109,11 +105,11 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
-        String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC))
+        final String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC))
                 .toString();
-        String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID))
+        final String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID))
                 .toString();
-        String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS))
+        final String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS))
                 .toString();
 
         //validate event-extension
@@ -130,15 +126,15 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
-        String bizNo =
+        final String bizNo =
                 Objects.requireNonNull(event.getExtension(SendMessageRequestBody.BIZSEQNO)).toString();
-        String uniqueId =
+        final String uniqueId =
                 Objects.requireNonNull(event.getExtension(SendMessageRequestBody.UNIQUEID)).toString();
-        String producerGroup =
+        final String producerGroup =
                 Objects.requireNonNull(event.getExtension(SendMessageRequestBody.PRODUCERGROUP))
                         .toString();
-        String topic = event.getSubject();
-        String ttl =
+        final String topic = event.getSubject();
+        final String ttl =
                 Objects.requireNonNull(event.getExtension(SendMessageRequestBody.TTL)).toString();
 
         //validate body
@@ -159,10 +155,10 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
 
         //do acl check
         if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
-            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
-            String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
-            int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
+            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            final String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
+            final String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
+            final int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
 
             try {
                 Acl.doAclCheckInHttpSend(remoteAddr, user, pass, sys, topic, requestCode);
@@ -174,7 +170,10 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                                 .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
                                         e.getMessage()));
                 asyncContext.onComplete(responseEventMeshCommand);
-                aclLogger.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
+
+                if (log.isWarnEnabled()) {
+                    log.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
+                }
                 return;
             }
         }
@@ -184,29 +183,31 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                 .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS,
                         TimeUnit.MILLISECONDS)) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                sendMessageResponseHeader,
-                SendMessageResponseBody
-                    .buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
-                        EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
+                    sendMessageResponseHeader,
+                    SendMessageResponseBody
+                            .buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
+                                    EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
             eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPDiscard();
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
-        String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
+        final String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
         if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
-            httpLogger.error("Event size exceeds the limit: {}",
-                eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+            if (log.isErrorEnabled()) {
+                log.error("Event size exceeds the limit: {}",
+                        eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+            }
 
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                sendMessageResponseHeader,
-                SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
-                    "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+                    sendMessageResponseHeader,
+                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                            "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
-        EventMeshProducer eventMeshProducer =
+        final EventMeshProducer eventMeshProducer =
                 eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
         if (!eventMeshProducer.getStarted().get()) {
@@ -219,19 +220,22 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
+        CloudEvent newEevent;
         try {
-            event = CloudEventBuilder.from(event)
+            newEevent = CloudEventBuilder.from(event)
                     .withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
                     .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                     .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                     .build();
 
-            if (messageLogger.isDebugEnabled()) {
-                messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
+            if (log.isDebugEnabled()) {
+                log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
             }
 
         } catch (Exception e) {
-            messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
+            if (log.isErrorEnabled()) {
+                log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
+            }
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
                     sendMessageResponseHeader,
                     SendMessageResponseBody
@@ -243,23 +247,24 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
         }
 
         final SendMessageContext sendMessageContext =
-                new SendMessageContext(bizNo, event, eventMeshProducer,
+                new SendMessageContext(bizNo, newEevent, eventMeshProducer,
                         eventMeshHTTPServer);
         eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsg();
 
-        long startTime = System.currentTimeMillis();
+        final long startTime = System.currentTimeMillis();
 
         final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
             @Override
-            public void onResponse(HttpCommand httpCommand) {
+            public void onResponse(final HttpCommand httpCommand) {
                 try {
-                    if (httpLogger.isDebugEnabled()) {
-                        httpLogger.debug("{}", httpCommand);
+                    if (log.isDebugEnabled()) {
+                        log.debug("{}", httpCommand);
                     }
                     eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
                     eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPReqResTimeCost(
-                        System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+                            System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
                 } catch (Exception ex) {
+                    log.error("onResponse error", ex);
                     // ignore
                 }
             }
@@ -269,45 +274,51 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
         try {
             eventMeshProducer.request(sendMessageContext, new RequestReplyCallback() {
                 @Override
-                public void onSuccess(CloudEvent event) {
-                    messageLogger.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
-                                    + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
-                            topic, bizNo, uniqueId);
+                public void onSuccess(final CloudEvent event) {
+                    if (log.isInfoEnabled()) {
+                        log.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+                                        + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+                                topic, bizNo, uniqueId);
+                    }
 
                     try {
-                        event = CloudEventBuilder.from(event)
+                        final CloudEvent newEvent = CloudEventBuilder.from(event)
                                 .withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
                                         String.valueOf(System.currentTimeMillis()))
                                 .withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
                                         String.valueOf(System.currentTimeMillis()))
                                 .build();
-                        final String rtnMsg = new String(Objects.requireNonNull(event.getData()).toBytes(),
+
+                        final String rtnMsg = new String(Objects.requireNonNull(newEvent.getData()).toBytes(),
                                 StandardCharsets.UTF_8);
 
-                        HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
+                        final HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
                                 sendMessageResponseHeader,
                                 SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
                                         JsonUtils.serialize(SendMessageResponseBody.ReplyMessage.builder()
                                                 .topic(topic)
                                                 .body(rtnMsg)
-                                                .properties(EventMeshUtil.getEventProp(event))
+                                                .properties(EventMeshUtil.getEventProp(newEvent))
                                                 .build())));
                         asyncContext.onComplete(succ, handler);
                     } catch (Exception ex) {
-                        HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+                        final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
                                 sendMessageResponseHeader,
                                 SendMessageResponseBody.buildBody(
                                         EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
                                         EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
                                                 + EventMeshUtil.stackTrace(ex, 2)));
                         asyncContext.onComplete(err, handler);
-                        messageLogger.warn("message|mq2eventMesh|RSP", ex);
+
+                        if (log.isWarnEnabled()) {
+                            log.warn("message|mq2eventMesh|RSP", ex);
+                        }
                     }
                 }
 
                 @Override
-                public void onException(Throwable e) {
-                    HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+                public void onException(final Throwable e) {
+                    final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
                             sendMessageResponseHeader,
                             SendMessageResponseBody
                                     .buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
@@ -315,29 +326,34 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                                                     + EventMeshUtil.stackTrace(e, 2)));
                     asyncContext.onComplete(err, handler);
 
-                    eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
-                    messageLogger.error(
-                            "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
-                                    + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
-                            topic, bizNo, uniqueId, e);
+                    eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+                    if (log.isErrorEnabled()) {
+                        log.error(
+                                "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+                                        + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+                                topic, bizNo, uniqueId, e);
+                    }
                 }
             }, Integer.parseInt(ttl));
         } catch (Exception ex) {
-            HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                sendMessageResponseHeader,
-                SendMessageResponseBody
-                    .buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
-                        EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg()
-                            + EventMeshUtil.stackTrace(ex, 2)));
+            final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+                    sendMessageResponseHeader,
+                    SendMessageResponseBody
+                            .buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
+                                    EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg()
+                                            + EventMeshUtil.stackTrace(ex, 2)));
             asyncContext.onComplete(err);
 
-            eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
-            long endTime = System.currentTimeMillis();
+            eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+            final long endTime = System.currentTimeMillis();
             eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsgFailed();
             eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsgCost(endTime - startTime);
-            messageLogger.error(
-                "message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                endTime - startTime, topic, bizNo, uniqueId, ex);
+
+            if (log.isErrorEnabled()) {
+                log.error(
+                        "message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+                        endTime - startTime, topic, bizNo, uniqueId, ex);
+            }
         }
 
         return;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org