You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/01/04 08:01:42 UTC
[incubator-eventmesh] branch master updated: Simplify the code (#2802)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new f944132ff Simplify the code (#2802)
f944132ff is described below
commit f944132ff8e787e267e6cc5bffd7dc0b4c55ce40
Author: weihubeats <we...@163.com>
AuthorDate: Wed Jan 4 16:01:36 2023 +0800
Simplify the code (#2802)
---
.../http/processor/SendAsyncMessageProcessor.java | 104 ++++++++++-----------
1 file changed, 48 insertions(+), 56 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 8a607a42b..86d12dd86 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -34,6 +34,7 @@ import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
@@ -59,6 +60,9 @@ import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.trace.Span;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
public class SendAsyncMessageProcessor implements HttpRequestProcessor {
public Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
@@ -69,47 +73,43 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
public Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
- private EventMeshHTTPServer eventMeshHTTPServer;
-
- public SendAsyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
- this.eventMeshHTTPServer = eventMeshHTTPServer;
- }
+ private final EventMeshHTTPServer eventMeshHTTPServer;
@Override
public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
HttpCommand responseEventMeshCommand;
-
+ String localAddress = IPUtils.getLocalAddress();
+ HttpCommand request = asyncContext.getRequest();
cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(
- Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ Integer.valueOf(request.getRequestCode())),
EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtils.getLocalAddress());
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
- SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
+ SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) request.getHeader();
+ EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
SendMessageResponseHeader sendMessageResponseHeader =
- SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster(),
- IPUtils.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv(),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
+ SendMessageResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
+ eventMeshHttpConfiguration.getEventMeshCluster(),
+ localAddress, eventMeshHttpConfiguration.getEventMeshEnv(),
+ eventMeshHttpConfiguration.getEventMeshIDC());
String protocolType = sendMessageRequestHeader.getProtocolType();
String protocolVersin = sendMessageRequestHeader.getProtocolVersion();
ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
ProtocolPluginFactory.getProtocolAdaptor(protocolType);
- CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
+ CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(request);
Span span = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, true);
//validate event
if (event == null
- || StringUtils.isBlank(event.getId())
|| event.getSource() == null
|| event.getSpecVersion() == null
- || StringUtils.isBlank(event.getType())
- || StringUtils.isBlank(event.getSubject())) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
@@ -127,11 +127,9 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
//validate event-extension
- if (StringUtils.isBlank(idc)
- || StringUtils.isBlank(pid)
- || !StringUtils.isNumeric(pid)
- || StringUtils.isBlank(sys)) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ if (StringUtils.isAnyBlank(idc, pid, sys)
+ || !StringUtils.isNumeric(pid)) {
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
@@ -150,12 +148,9 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
String topic = event.getSubject();
//validate body
- if (StringUtils.isBlank(bizNo)
- || StringUtils.isBlank(uniqueId)
- || StringUtils.isBlank(producerGroup)
- || StringUtils.isBlank(topic)
+ if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
|| event.getData() == null) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
@@ -169,16 +164,16 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
}
//do acl check
- if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
+ if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
String subsystem = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
- int requestCode = Integer.parseInt(asyncContext.getRequest().getRequestCode());
+ int requestCode = Integer.parseInt(request.getRequestCode());
try {
Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
} catch (Exception e) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
asyncContext.onComplete(responseEventMeshCommand);
@@ -195,7 +190,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
@@ -212,7 +207,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg()));
@@ -233,14 +228,14 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
}
String content = event.getData() == null ? "" : new String(event.getData().toBytes(), StandardCharsets.UTF_8);
- if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
+ if (content.length() > eventMeshHttpConfiguration.eventMeshEventSize) {
httpLogger.error("Event size exceeds the limit: {}",
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);
+ eventMeshHttpConfiguration.eventMeshEventSize);
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR.getRetCode(),
- "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
+ "Event size exceeds the limit: " + eventMeshHttpConfiguration.eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
Span excepSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersin, event),
@@ -253,9 +248,9 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
try {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
- .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, asyncContext.getRequest().reqTime)
+ .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, request.reqTime)
.withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+ eventMeshHttpConfiguration.getEventMeshServerIp())
.build();
if (messageLogger.isDebugEnabled()) {
@@ -263,7 +258,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
}
} catch (Exception e) {
messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
- responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
@@ -282,20 +277,17 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
long startTime = System.currentTimeMillis();
- final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
- @Override
- public void onResponse(HttpCommand httpCommand) {
- try {
- if (httpLogger.isDebugEnabled()) {
- httpLogger.debug("{}", httpCommand);
- }
- eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
-
- eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPReqResTimeCost(
- System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
- } catch (Exception ex) {
- //ignore
+ final CompleteHandler<HttpCommand> handler = httpCommand -> {
+ try {
+ if (httpLogger.isDebugEnabled()) {
+ httpLogger.debug("{}", httpCommand);
}
+ eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
+
+ eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordHTTPReqResTimeCost(
+ System.currentTimeMillis() - request.getReqTime());
+ } catch (Exception ex) {
+ //ignore
}
};
@@ -313,7 +305,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
@Override
public void onSuccess(SendResult sendResult) {
- HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
+ HttpCommand succ = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
EventMeshRetCode.SUCCESS.getErrMsg() + sendResult.toString()));
@@ -328,7 +320,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
@Override
public void onException(OnExceptionContext context) {
- HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+ HttpCommand err = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
@@ -353,7 +345,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
} catch (Exception ex) {
- HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
+ HttpCommand err = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org