You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/09 01:42:17 UTC
[incubator-eventmesh] branch master updated: fix issue2674
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 57a596c64 fix issue2674
new 078142233 Merge pull request #2825 from jonyangx/issue2674
57a596c64 is described below
commit 57a596c64b94dca520387910371667e8704871cc
Author: jonyangx <ya...@gmail.com>
AuthorDate: Thu Jan 5 08:06:28 2023 +0800
fix issue2674
---
.../http/processor/SendSyncMessageProcessor.java | 186 +++++++++++----------
1 file changed, 101 insertions(+), 85 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 064553488..6b5deb4c7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
@@ -47,49 +48,44 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
-public class SendSyncMessageProcessor implements HttpRequestProcessor {
-
- public Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
-
- public Logger cmdLogger = LoggerFactory.getLogger(EventMeshConstants.CMD);
-
- public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
-
- public Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
+import lombok.extern.slf4j.Slf4j;
- private EventMeshHTTPServer eventMeshHTTPServer;
+@Slf4j
+public class SendSyncMessageProcessor implements HttpRequestProcessor {
+ private transient EventMeshHTTPServer eventMeshHTTPServer;
- public SendSyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
+ public SendSyncMessageProcessor(final EventMeshHTTPServer eventMeshHTTPServer) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
}
@Override
- public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext)
throws Exception {
HttpCommand responseEventMeshCommand;
- cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
- RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress());
+ final String localAddress = IPUtils.getLocalAddress();
+
+ if (log.isInfoEnabled()) {
+ log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
+ }
- ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
+ final ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
ProtocolPluginFactory.getProtocolAdaptor("cloudevents");
- CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
+ final CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
- SendMessageResponseHeader sendMessageResponseHeader =
+ final SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader
.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster(),
- IPUtils.getLocalAddress(),
+ localAddress,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
@@ -109,11 +105,11 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
return;
}
- String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC))
+ final String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC))
.toString();
- String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID))
+ final String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID))
.toString();
- String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS))
+ final String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS))
.toString();
//validate event-extension
@@ -130,15 +126,15 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
return;
}
- String bizNo =
+ final String bizNo =
Objects.requireNonNull(event.getExtension(SendMessageRequestBody.BIZSEQNO)).toString();
- String uniqueId =
+ final String uniqueId =
Objects.requireNonNull(event.getExtension(SendMessageRequestBody.UNIQUEID)).toString();
- String producerGroup =
+ final String producerGroup =
Objects.requireNonNull(event.getExtension(SendMessageRequestBody.PRODUCERGROUP))
.toString();
- String topic = event.getSubject();
- String ttl =
+ final String topic = event.getSubject();
+ final String ttl =
Objects.requireNonNull(event.getExtension(SendMessageRequestBody.TTL)).toString();
//validate body
@@ -159,10 +155,10 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
//do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
- String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
- String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
- int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
+ final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ final String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
+ final String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
+ final int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
try {
Acl.doAclCheckInHttpSend(remoteAddr, user, pass, sys, topic, requestCode);
@@ -174,7 +170,10 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
e.getMessage()));
asyncContext.onComplete(responseEventMeshCommand);
- aclLogger.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
+
+ if (log.isWarnEnabled()) {
+ log.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
+ }
return;
}
}
@@ -184,29 +183,31 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS,
TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody
- .buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPDiscard();
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
+ final String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
- httpLogger.error("Event size exceeds the limit: {}",
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+ if (log.isErrorEnabled()) {
+ log.error("Event size exceeds the limit: {}",
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+ }
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
- "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- EventMeshProducer eventMeshProducer =
+ final EventMeshProducer eventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
@@ -219,19 +220,22 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
return;
}
+ CloudEvent newEevent;
try {
- event = CloudEventBuilder.from(event)
+ newEevent = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
- if (messageLogger.isDebugEnabled()) {
- messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
+ if (log.isDebugEnabled()) {
+ log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
}
} catch (Exception e) {
- messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
+ if (log.isErrorEnabled()) {
+ log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
+ }
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody
@@ -243,23 +247,24 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
}
final SendMessageContext sendMessageContext =
- new SendMessageContext(bizNo, event, eventMeshProducer,
+ new SendMessageContext(bizNo, newEevent, eventMeshProducer,
eventMeshHTTPServer);
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsg();
- long startTime = System.currentTimeMillis();
+ final long startTime = System.currentTimeMillis();
final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
@Override
- public void onResponse(HttpCommand httpCommand) {
+ public void onResponse(final HttpCommand httpCommand) {
try {
- if (httpLogger.isDebugEnabled()) {
- httpLogger.debug("{}", httpCommand);
+ if (log.isDebugEnabled()) {
+ log.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPReqResTimeCost(
- System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+ System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
+ log.error("onResponse error", ex);
// ignore
}
}
@@ -269,45 +274,51 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
try {
eventMeshProducer.request(sendMessageContext, new RequestReplyCallback() {
@Override
- public void onSuccess(CloudEvent event) {
- messageLogger.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
- + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
- topic, bizNo, uniqueId);
+ public void onSuccess(final CloudEvent event) {
+ if (log.isInfoEnabled()) {
+ log.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+ + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+ topic, bizNo, uniqueId);
+ }
try {
- event = CloudEventBuilder.from(event)
+ final CloudEvent newEvent = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
- final String rtnMsg = new String(Objects.requireNonNull(event.getData()).toBytes(),
+
+ final String rtnMsg = new String(Objects.requireNonNull(newEvent.getData()).toBytes(),
StandardCharsets.UTF_8);
- HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
+ final HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
JsonUtils.serialize(SendMessageResponseBody.ReplyMessage.builder()
.topic(topic)
.body(rtnMsg)
- .properties(EventMeshUtil.getEventProp(event))
+ .properties(EventMeshUtil.getEventProp(newEvent))
.build())));
asyncContext.onComplete(succ, handler);
} catch (Exception ex) {
- HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+ final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(
EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err, handler);
- messageLogger.warn("message|mq2eventMesh|RSP", ex);
+
+ if (log.isWarnEnabled()) {
+ log.warn("message|mq2eventMesh|RSP", ex);
+ }
}
}
@Override
- public void onException(Throwable e) {
- HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+ public void onException(final Throwable e) {
+ final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
@@ -315,29 +326,34 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
+ EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err, handler);
- eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
- messageLogger.error(
- "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
- + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
- topic, bizNo, uniqueId, e);
+ eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+ if (log.isErrorEnabled()) {
+ log.error(
+ "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+ + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+ topic, bizNo, uniqueId, e);
+ }
}
}, Integer.parseInt(ttl));
} catch (Exception ex) {
- HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody
- .buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg()
- + EventMeshUtil.stackTrace(ex, 2)));
+ final HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err);
- eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
- long endTime = System.currentTimeMillis();
+ eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+ final long endTime = System.currentTimeMillis();
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsgFailed();
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsgCost(endTime - startTime);
- messageLogger.error(
- "message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- endTime - startTime, topic, bizNo, uniqueId, ex);
+
+ if (log.isErrorEnabled()) {
+ log.error(
+ "message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+ endTime - startTime, topic, bizNo, uniqueId, ex);
+ }
}
return;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org