You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/04 13:31:08 UTC
[incubator-eventmesh] branch master updated: fix issue2371
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new bfadb00aa fix issue2371
new 3337a538c Merge pull request #2446 from jonyangx/issue2371
bfadb00aa is described below
commit bfadb00aa80a032605b8f81d96cb8a7e5268c59d
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Dec 4 09:23:45 2022 +0800
fix issue2371
---
.../protocol/http/push/AsyncHTTPPushRequest.java | 143 ++++++++++++---------
1 file changed, 79 insertions(+), 64 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index a2e5cdee0..fa8ff81c8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -73,11 +73,12 @@ import com.google.common.collect.Sets;
public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
- public Logger messageLogger = LoggerFactory.getLogger("message");
+ public static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
- public Logger cmdLogger = LoggerFactory.getLogger("cmd");
+ public static final Logger CMD_LOGGER = LoggerFactory.getLogger("cmd");
+
+ public static final Logger LOGGER = LoggerFactory.getLogger("AsyncHTTPPushRequest");
- public Logger logger = LoggerFactory.getLogger(this.getClass());
public String currPushUrl;
private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;
@@ -100,7 +101,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
String requestCode = "";
- if (SubscriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
+ if (SubscriptionType.SYNC == handleMsgContext.getSubscriptionItem().getType()) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
@@ -110,20 +111,20 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
- handleMsgContext.getEventMeshHTTPServer()
- .getEventMeshHttpConfiguration().eventMeshCluster);
+ handleMsgContext.getEventMeshHTTPServer()
+ .getEventMeshHttpConfiguration().eventMeshCluster);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
- handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
+ handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
- handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
+ handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
- .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.RSP_URL, currPushUrl)
- .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
- .build();
+ .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.RSP_URL, currPushUrl)
+ .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
+ .build();
handleMsgContext.setEvent(event);
String content = "";
@@ -133,7 +134,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
ProtocolTransportObject protocolTransportObject =
- protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
+ protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
if (protocolTransportObject instanceof HttpCommand) {
content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
} else {
@@ -149,7 +150,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
}
} catch (Exception ex) {
- logger.error("Failed to convert EventMeshMessage from CloudEvent", ex);
+ LOGGER.error("Failed to convert EventMeshMessage from CloudEvent", ex);
return;
}
@@ -157,25 +158,25 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
- RandomStringUtils.generateNum(20)));
+ RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
- handleMsgContext.getBizSeqNo()));
+ handleMsgContext.getBizSeqNo()));
}
if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
- RandomStringUtils.generateNum(20)));
+ RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
- handleMsgContext.getUniqueId()));
+ handleMsgContext.getUniqueId()));
}
body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
- handleMsgContext.getMsgRandomNo()));
+ handleMsgContext.getMsgRandomNo()));
body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
- JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
+ JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8);
@@ -183,10 +184,10 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
// for CloudEvents Webhook spec
String urlAuthType = handleMsgContext.getConsumerGroupConfig().getConsumerGroupTopicConf()
- .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
+ .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(), eventMeshHttpConfiguration.eventMeshWebhookOrigin,
- urlAuthType);
+ urlAuthType);
eventMeshHTTPServer.metrics.getSummaryMetrics().recordPushMsg();
@@ -194,8 +195,10 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
addToWaitingMap(this);
- cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
- IPUtils.getLocalAddress(), currPushUrl);
+ if (CMD_LOGGER.isInfoEnabled()) {
+ CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
+ IPUtils.getLocalAddress(), currPushUrl);
+ }
try {
eventMeshHTTPServer.httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
@@ -210,17 +213,19 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
String res = "";
try {
res = EntityUtils.toString(response.getEntity(),
- Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
+ Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
} catch (IOException e) {
handleMsgContext.finish();
return new Object();
}
ClientRetCode result = processResponseContent(res);
- messageLogger.info(
- "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
- + "|uniqueId={}|cost={}",
- result, currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info(
+ "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+ + "|uniqueId={}|cost={}",
+ result, currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ }
if (result == ClientRetCode.OK || result == ClientRetCode.REMOTE_OK) {
complete();
if (isComplete()) {
@@ -244,10 +249,12 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
}
} else {
eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
- messageLogger.info(
- "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
- + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info(
+ "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+ + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ }
if (isComplete()) {
handleMsgContext.finish();
@@ -257,18 +264,20 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
}
});
- if (messageLogger.isDebugEnabled()) {
- messageLogger.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
- handleMsgContext.getTopic(),
- handleMsgContext.getEvent());
+ if (MESSAGE_LOGGER.isDebugEnabled()) {
+ MESSAGE_LOGGER.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
+ handleMsgContext.getTopic(),
+ handleMsgContext.getEvent());
} else {
- messageLogger
- .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
- currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER
+ .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
+ currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
+ }
}
} catch (IOException e) {
- messageLogger.error("push2client err", e);
+ MESSAGE_LOGGER.error("push2client err", e);
removeWaitingMap(this);
delayRetry();
if (isComplete()) {
@@ -281,22 +290,22 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("asyncPushRequest={")
- .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
- .append(",startIdx=").append(startIdx)
- .append(",retryTimes=").append(retryTimes)
- .append(",uniqueId=").append(handleMsgContext.getUniqueId())
- .append(",executeTime=")
- .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
- .append(",lastPushTime=")
- .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
- .append(",createTime=")
- .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
+ .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
+ .append(",startIdx=").append(startIdx)
+ .append(",retryTimes=").append(retryTimes)
+ .append(",uniqueId=").append(handleMsgContext.getUniqueId())
+ .append(",executeTime=")
+ .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
+ .append(",lastPushTime=")
+ .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
+ .append(",createTime=")
+ .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
return sb.toString();
}
boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) {
if (httpStatus == HttpStatus.SC_OK || httpStatus == HttpStatus.SC_CREATED
- || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
+ || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
// success http response
return true;
} else if (httpStatus == 429) {
@@ -326,8 +335,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
try {
Map<String, Object> ret =
- JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
- });
+ JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
+ });
Integer retCode = (Integer) ret.get("retCode");
if (retCode != null && ClientRetCode.contains(retCode)) {
return ClientRetCode.get(retCode);
@@ -335,16 +344,22 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
return ClientRetCode.FAIL;
} catch (NumberFormatException e) {
- messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ if (MESSAGE_LOGGER.isWarnEnabled()) {
+ MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ }
return ClientRetCode.FAIL;
} catch (JsonException e) {
- messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ if (MESSAGE_LOGGER.isWarnEnabled()) {
+ MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ }
return ClientRetCode.FAIL;
} catch (Throwable t) {
- messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ if (MESSAGE_LOGGER.isWarnEnabled()) {
+ MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ }
return ClientRetCode.FAIL;
}
}
@@ -355,7 +370,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
return;
}
waitingRequests
- .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
+ .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org