You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/04 13:31:08 UTC

[incubator-eventmesh] branch master updated: fix issue2371

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new bfadb00aa fix issue2371
     new 3337a538c Merge pull request #2446 from jonyangx/issue2371
bfadb00aa is described below

commit bfadb00aa80a032605b8f81d96cb8a7e5268c59d
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Dec 4 09:23:45 2022 +0800

    fix issue2371
---
 .../protocol/http/push/AsyncHTTPPushRequest.java   | 143 ++++++++++++---------
 1 file changed, 79 insertions(+), 64 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index a2e5cdee0..fa8ff81c8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -73,11 +73,12 @@ import com.google.common.collect.Sets;
 
 public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
-    public Logger messageLogger = LoggerFactory.getLogger("message");
+    public static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
 
-    public Logger cmdLogger = LoggerFactory.getLogger("cmd");
+    public static final Logger CMD_LOGGER = LoggerFactory.getLogger("cmd");
+
+    public static final Logger LOGGER = LoggerFactory.getLogger("AsyncHTTPPushRequest");
 
-    public Logger logger = LoggerFactory.getLogger(this.getClass());
     public String currPushUrl;
     private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;
 
@@ -100,7 +101,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
         String requestCode = "";
 
-        if (SubscriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
+        if (SubscriptionType.SYNC == handleMsgContext.getSubscriptionItem().getType()) {
             requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
         } else {
             requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
@@ -110,20 +111,20 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
         builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
-            handleMsgContext.getEventMeshHTTPServer()
-                .getEventMeshHttpConfiguration().eventMeshCluster);
+                handleMsgContext.getEventMeshHTTPServer()
+                        .getEventMeshHttpConfiguration().eventMeshCluster);
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
-            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
+                handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
-            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
+                handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
 
         CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
-            .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
-                String.valueOf(System.currentTimeMillis()))
-            .withExtension(EventMeshConstants.RSP_URL, currPushUrl)
-            .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
-            .build();
+                .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()))
+                .withExtension(EventMeshConstants.RSP_URL, currPushUrl)
+                .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
+                .build();
         handleMsgContext.setEvent(event);
 
         String content = "";
@@ -133,7 +134,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
 
             ProtocolTransportObject protocolTransportObject =
-                protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
+                    protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
             if (protocolTransportObject instanceof HttpCommand) {
                 content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
             } else {
@@ -149,7 +150,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             }
 
         } catch (Exception ex) {
-            logger.error("Failed to convert EventMeshMessage from CloudEvent", ex);
+            LOGGER.error("Failed to convert EventMeshMessage from CloudEvent", ex);
             return;
         }
 
@@ -157,25 +158,25 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
         if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
             body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
-                RandomStringUtils.generateNum(20)));
+                    RandomStringUtils.generateNum(20)));
         } else {
             body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
-                handleMsgContext.getBizSeqNo()));
+                    handleMsgContext.getBizSeqNo()));
         }
         if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
             body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
-                RandomStringUtils.generateNum(20)));
+                    RandomStringUtils.generateNum(20)));
         } else {
             body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
-                handleMsgContext.getUniqueId()));
+                    handleMsgContext.getUniqueId()));
         }
 
         body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
-            handleMsgContext.getMsgRandomNo()));
+                handleMsgContext.getMsgRandomNo()));
         body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
 
         body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
-            JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
+                JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
 
         HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8);
 
@@ -183,10 +184,10 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
         // for CloudEvents Webhook spec
         String urlAuthType = handleMsgContext.getConsumerGroupConfig().getConsumerGroupTopicConf()
-            .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
+                .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
 
         WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(), eventMeshHttpConfiguration.eventMeshWebhookOrigin,
-            urlAuthType);
+                urlAuthType);
 
         eventMeshHTTPServer.metrics.getSummaryMetrics().recordPushMsg();
 
@@ -194,8 +195,10 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
         addToWaitingMap(this);
 
-        cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
-            IPUtils.getLocalAddress(), currPushUrl);
+        if (CMD_LOGGER.isInfoEnabled()) {
+            CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
+                    IPUtils.getLocalAddress(), currPushUrl);
+        }
 
         try {
             eventMeshHTTPServer.httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
@@ -210,17 +213,19 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
                         String res = "";
                         try {
                             res = EntityUtils.toString(response.getEntity(),
-                                Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
+                                    Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
                         } catch (IOException e) {
                             handleMsgContext.finish();
                             return new Object();
                         }
                         ClientRetCode result = processResponseContent(res);
-                        messageLogger.info(
-                            "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
-                                + "|uniqueId={}|cost={}",
-                            result, currPushUrl, handleMsgContext.getTopic(),
-                            handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                        if (MESSAGE_LOGGER.isInfoEnabled()) {
+                            MESSAGE_LOGGER.info(
+                                    "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+                                            + "|uniqueId={}|cost={}",
+                                    result, currPushUrl, handleMsgContext.getTopic(),
+                                    handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                        }
                         if (result == ClientRetCode.OK || result == ClientRetCode.REMOTE_OK) {
                             complete();
                             if (isComplete()) {
@@ -244,10 +249,12 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
                         }
                     } else {
                         eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
-                        messageLogger.info(
-                            "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
-                                + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
-                            handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                        if (MESSAGE_LOGGER.isInfoEnabled()) {
+                            MESSAGE_LOGGER.info(
+                                    "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+                                            + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
+                                    handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                        }
 
                         if (isComplete()) {
                             handleMsgContext.finish();
@@ -257,18 +264,20 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
                 }
             });
 
-            if (messageLogger.isDebugEnabled()) {
-                messageLogger.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
-                    handleMsgContext.getTopic(),
-                    handleMsgContext.getEvent());
+            if (MESSAGE_LOGGER.isDebugEnabled()) {
+                MESSAGE_LOGGER.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
+                        handleMsgContext.getTopic(),
+                        handleMsgContext.getEvent());
             } else {
-                messageLogger
-                    .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
-                        currPushUrl, handleMsgContext.getTopic(),
-                        handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
+                if (MESSAGE_LOGGER.isInfoEnabled()) {
+                    MESSAGE_LOGGER
+                            .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
+                                    currPushUrl, handleMsgContext.getTopic(),
+                                    handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
+                }
             }
         } catch (IOException e) {
-            messageLogger.error("push2client err", e);
+            MESSAGE_LOGGER.error("push2client err", e);
             removeWaitingMap(this);
             delayRetry();
             if (isComplete()) {
@@ -281,22 +290,22 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("asyncPushRequest={")
-            .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
-            .append(",startIdx=").append(startIdx)
-            .append(",retryTimes=").append(retryTimes)
-            .append(",uniqueId=").append(handleMsgContext.getUniqueId())
-            .append(",executeTime=")
-            .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
-            .append(",lastPushTime=")
-            .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
-            .append(",createTime=")
-            .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
+                .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
+                .append(",startIdx=").append(startIdx)
+                .append(",retryTimes=").append(retryTimes)
+                .append(",uniqueId=").append(handleMsgContext.getUniqueId())
+                .append(",executeTime=")
+                .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
+                .append(",lastPushTime=")
+                .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
+                .append(",createTime=")
+                .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
         return sb.toString();
     }
 
     boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) {
         if (httpStatus == HttpStatus.SC_OK || httpStatus == HttpStatus.SC_CREATED
-            || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
+                || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
             // success http response
             return true;
         } else if (httpStatus == 429) {
@@ -326,8 +335,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
         try {
             Map<String, Object> ret =
-                JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
-                });
+                    JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
+                    });
             Integer retCode = (Integer) ret.get("retCode");
             if (retCode != null && ClientRetCode.contains(retCode)) {
                 return ClientRetCode.get(retCode);
@@ -335,16 +344,22 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
             return ClientRetCode.FAIL;
         } catch (NumberFormatException e) {
-            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
-                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            if (MESSAGE_LOGGER.isWarnEnabled()) {
+                MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+                        handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            }
             return ClientRetCode.FAIL;
         } catch (JsonException e) {
-            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
-                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            if (MESSAGE_LOGGER.isWarnEnabled()) {
+                MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
+                        handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            }
             return ClientRetCode.FAIL;
         } catch (Throwable t) {
-            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
-                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            if (MESSAGE_LOGGER.isWarnEnabled()) {
+                MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
+                        handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            }
             return ClientRetCode.FAIL;
         }
     }
@@ -355,7 +370,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             return;
         }
         waitingRequests
-            .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
+                .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
         waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org