You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/09/30 09:28:54 UTC
[incubator-eventmesh] branch develop updated: [Issue #528] enhance
Retry support for HTTP and TCP processors. (#529)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new b9abbcd [Issue #528] enhance Retry support for HTTP and TCP processors. (#529)
b9abbcd is described below
commit b9abbcd1051e4c7a200788faa658c3e8a84f2466
Author: jinrongluo <ka...@gmail.com>
AuthorDate: Thu Sep 30 05:28:50 2021 -0400
[Issue #528] enhance Retry support for HTTP and TCP processors. (#529)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Address code review comment for Subscriber Demo App
* [Issue #528] enhance Retry support for HTTP and TCP processors.
* [Issue #528] fixing checkstyle issue
* [Issue #528] Fix taskExecuteTime in TCP ClientGroupWrapper UpStreamMsgContext
Co-authored-by: j00441484 <ji...@huawei.com>
---
.../eventmesh/runtime/boot/EventMeshTCPServer.java | 2 +-
.../http/processor/SendAsyncMessageProcessor.java | 4 +
.../http/processor/SendSyncMessageProcessor.java | 4 +
.../tcp/client/group/ClientGroupWrapper.java | 32 ++-
.../client/session/push/DownStreamMsgContext.java | 101 +++++++---
.../session/push/retry/EventMeshTcpRetryer.java | 217 ---------------------
.../client/session/retry/EventMeshTcpRetryer.java | 127 ++++++++++++
.../tcp/client/session/retry/RetryContext.java | 57 ++++++
.../tcp/client/session/send/SessionSender.java | 15 +-
.../client/session/send/UpStreamMsgContext.java | 115 +++++++++--
.../tcp/client/task/MessageTransferTask.java | 22 ++-
11 files changed, 397 insertions(+), 299 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index 4620ce7..6cbb130 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -44,7 +44,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpMessage
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceService;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventmeshRebalanceImpl;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.registry.Registry;
import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index d64ef6b..3649dd0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -245,6 +245,8 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(context.getException(), 2)));
asyncContext.onComplete(err, handler);
+
+ eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
long endTime = System.currentTimeMillis();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
@@ -261,6 +263,8 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err);
+
+ eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
long endTime = System.currentTimeMillis();
messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 5aa623d..38f3f25 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -254,6 +254,8 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err, handler);
+
+ eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
messageLogger.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime,
sendMessageRequestBody.getTopic(),
@@ -267,6 +269,8 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err);
+
+ eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
long endTime = System.currentTimeMillis();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 311d6eb..87f7b9b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -17,26 +17,8 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.api.AsyncConsumeContext;
-import io.openmessaging.api.AsyncMessageListener;
-import io.openmessaging.api.Message;
-import io.openmessaging.api.OnExceptionContext;
-import io.openmessaging.api.SendCallback;
-import io.openmessaging.api.SendResult;
-
+import io.openmessaging.api.*;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.EventMeshAction;
@@ -53,7 +35,7 @@ import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
@@ -61,6 +43,12 @@ import org.apache.eventmesh.runtime.util.HttpTinyClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
public class ClientGroupWrapper {
public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
@@ -752,7 +740,9 @@ public class ClientGroupWrapper {
String topic = msg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
logger.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
- send(new UpStreamMsgContext(null, null, msg), new SendCallback() {
+ long startTime = System.currentTimeMillis();
+ long taskExcuteTime = startTime;
+ send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 712e6fa..5b86ba3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -17,45 +17,36 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
import io.openmessaging.api.Message;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext;
+import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.ServerGlobal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
-public class DownStreamMsgContext implements Delayed {
+public class DownStreamMsgContext extends RetryContext {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- public String seq;
-
- public Message msgExt;
-
public Session session;
public AbstractContext consumeConcurrentlyContext;
public MQConsumerWrapper consumer;
- public int retryTimes;
-
public SubscriptionItem subscriptionItem;
- private long executeTime;
-
public long lastPushTime;
private long createTime;
@@ -68,11 +59,9 @@ public class DownStreamMsgContext implements Delayed {
this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet());
this.msgExt = msgExt;
this.session = session;
- this.retryTimes = 0;
this.consumer = consumer;
this.consumeConcurrentlyContext = consumeConcurrentlyContext;
this.lastPushTime = System.currentTimeMillis();
- this.executeTime = System.currentTimeMillis();
this.createTime = System.currentTimeMillis();
this.subscriptionItem = subscriptionItem;
String ttlStr = msgExt.getUserProperties("TTL");
@@ -99,10 +88,6 @@ public class DownStreamMsgContext implements Delayed {
}
}
- public void delay(long delay) {
- this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * delay;
- }
-
@Override
public String toString() {
return "DownStreamMsgContext{" +
@@ -119,19 +104,71 @@ public class DownStreamMsgContext implements Delayed {
}
@Override
- public int compareTo(Delayed delayed) {
- DownStreamMsgContext context = (DownStreamMsgContext) delayed;
- if (this.executeTime > context.executeTime) {
- return 1;
- } else if (this.executeTime == context.executeTime) {
- return 0;
- } else {
- return -1;
+ public void retry() {
+ try {
+ logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+
+ if (isRetryMsgTimeout(this)) {
+ return;
+ }
+ this.retryTimes++;
+ this.lastPushTime = System.currentTimeMillis();
+
+ Session rechoosen = null;
+ String topic = this.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+ if (!SubscriptionMode.BROADCASTING.equals(this.subscriptionItem.getMode())) {
+ rechoosen = this.session.getClientGroupWrapper()
+ .get().getDownstreamDispatchStrategy().select(this.session.getClientGroupWrapper().get().getSysId()
+ , topic
+ , this.session.getClientGroupWrapper().get().getGroupConsumerSessions());
+ } else {
+ rechoosen = this.session;
+ }
+
+ if (rechoosen == null) {
+ logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+ } else {
+ this.session = rechoosen;
+ rechoosen.downstreamMsg(this);
+ logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+ }
+ } catch (Exception e) {
+ logger.error("retry-dispatcher error!", e);
}
}
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
+ boolean flag = false;
+ String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL);
+ long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;;
+
+ String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME);
+ long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0;
+ String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME);
+ long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;
+
+ String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME);
+ long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0;
+ double elapseTime = brokerCost + accessCost;
+ if (elapseTime >= ttl) {
+ logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
+ flag = true;
+ eventMeshAckMsg(downStreamMsgContext);
+ }
+ return flag;
+ }
+
+ /**
+ * eventMesh ack msg
+ *
+ * @param downStreamMsgContext
+ */
+ private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
+ List<Message> msgExts = new ArrayList<Message>();
+ msgExts.add(downStreamMsgContext.msgExt);
+ logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION),
+ downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
+ downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
}
+
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
deleted file mode 100644
index cf4e3b7..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry;
-
-import io.openmessaging.api.Message;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.common.protocol.SubscriptionType;
-import org.apache.eventmesh.common.protocol.SubscriptionMode;
-import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class EventMeshTcpRetryer {
-
- public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class);
-
- private EventMeshTCPServer eventMeshTCPServer;
-
- private DelayQueue<DownStreamMsgContext> retrys = new DelayQueue<DownStreamMsgContext>();
-
- private ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
- 3,
- 60000,
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
- new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true),
- new ThreadPoolExecutor.AbortPolicy());
-
- private Thread dispatcher;
-
- public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
- this.eventMeshTCPServer = eventMeshTCPServer;
- }
-
- public EventMeshTCPServer getEventMeshTCPServer() {
- return eventMeshTCPServer;
- }
-
- public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
- this.eventMeshTCPServer = eventMeshTCPServer;
- }
-
- public void pushRetry(DownStreamMsgContext downStreamMsgContext) {
- if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
- logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, downStreamMsgContext.retryTimes,
- downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
- return;
- }
-
- int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
- ? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes
- : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
- if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
- logger.warn("pushRetry fail,retry over maxRetryTimes:{}, pushType: {}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, downStreamMsgContext.subscriptionItem.getType(),
- downStreamMsgContext.retryTimes, downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
- return;
- }
-
- retrys.offer(downStreamMsgContext);
- logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes,
- EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
- }
-
- public void init() {
- dispatcher = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- DownStreamMsgContext downStreamMsgContext = null;
- while ((downStreamMsgContext = retrys.take()) != null) {
- final DownStreamMsgContext finalDownStreamMsgContext = downStreamMsgContext;
- pool.execute(() -> {
- retryHandle(finalDownStreamMsgContext);
- });
- }
- } catch (Exception e) {
- logger.error("retry-dispatcher error!", e);
- }
- }
- }, "retry-dispatcher");
- dispatcher.setDaemon(true);
- logger.info("EventMeshTcpRetryer inited......");
- }
-
- private void retryHandle(DownStreamMsgContext downStreamMsgContext) {
- try {
- logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq,
- downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-
- if (isRetryMsgTimeout(downStreamMsgContext)) {
- return;
- }
- downStreamMsgContext.retryTimes++;
- downStreamMsgContext.lastPushTime = System.currentTimeMillis();
-
- Session rechoosen = null;
- String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
- rechoosen = downStreamMsgContext.session.getClientGroupWrapper()
- .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getSysId()
- , topic
- , downStreamMsgContext.session.getClientGroupWrapper().get().getGroupConsumerSessions());
- } else {
- rechoosen = downStreamMsgContext.session;
- }
-
- if (rechoosen == null) {
- logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
- downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-
-// //Need to manually ack the message that did not send a successful message
-// eventMeshAckMsg(downStreamMsgContext);
-
-// //Retry cannot find the delivered session, no longer post back to the broker or retry other event Mesh
-// String bizSeqNo = finalDownStreamMsgContext.msgExt.getKeys();
-// String uniqueId = MapUtils.getString(finalDownStreamMsgContext.msgExt.getProperties(), WeMQConstant.RMB_UNIQ_ID, "");
-// if(EventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackEnabled){
-// sendMsgBackToBroker(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId);
-// }else{
-// // TODO: Push the message to other EventMesh instances. To be determined.
-// sendMsgToOtherEventMesh(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId);
-// }
- } else {
- downStreamMsgContext.session = rechoosen;
- rechoosen.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
- rechoosen.downstreamMsg(downStreamMsgContext);
- logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq,
- downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
- }
- } catch (Exception e) {
- logger.error("retry-dispatcher error!", e);
- }
- }
-
- private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
- boolean flag = false;
- String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL);
- long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;;
-
- String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME);
- long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0;
- String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME);
- long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;
-
- String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME);
- long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0;
- double elapseTime = brokerCost + accessCost;
- if (elapseTime >= ttl) {
- logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
- downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
- flag = true;
- eventMeshAckMsg(downStreamMsgContext);
- }
- return flag;
- }
-
- public void start() throws Exception {
- dispatcher.start();
- logger.info("EventMeshTcpRetryer started......");
- }
-
- public void shutdown() {
- pool.shutdown();
- logger.info("EventMeshTcpRetryer shutdown......");
- }
-
- public int getRetrySize() {
- return retrys.size();
- }
-
- /**
- * eventMesh ack msg
- *
- * @param downStreamMsgContext
- */
- private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
- List<Message> msgExts = new ArrayList<Message>();
- msgExts.add(downStreamMsgContext.msgExt);
- logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION),
- downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
- downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
-// ConsumeMessageService consumeMessageService = downStreamMsgContext.consumer.getDefaultMQPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService();
-// ((ConsumeMessageConcurrentlyService)consumeMessageService).updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
- }
-
- public void printRetryThreadPoolState() {
-// ThreadPoolHelper.printState(pool);
- }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
new file mode 100644
index 0000000..c43ef4f
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
+
+import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
+import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
+import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class EventMeshTcpRetryer {
+
+ public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class);
+
+ private EventMeshTCPServer eventMeshTCPServer;
+
+ private DelayQueue<RetryContext> retrys = new DelayQueue<RetryContext>();
+
+ private ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
+ 3,
+ 60000,
+ TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
+ new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true),
+ new ThreadPoolExecutor.AbortPolicy());
+
+ private Thread dispatcher;
+
+ public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ public EventMeshTCPServer getEventMeshTCPServer() {
+ return eventMeshTCPServer;
+ }
+
+ public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ public void pushRetry(RetryContext retryContext) {
+ if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
+ logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, retryContext.retryTimes,
+ retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.msgExt));
+ return;
+ }
+
+ int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+ if (retryContext instanceof DownStreamMsgContext) {
+ DownStreamMsgContext downStreamMsgContext = (DownStreamMsgContext) retryContext;
+ maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ?
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes :
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+ }
+
+ if (retryContext.retryTimes >= maxRetryTimes) {
+ logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes,
+ retryContext.retryTimes, retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.msgExt));
+ return;
+ }
+
+ retrys.offer(retryContext);
+ logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", retryContext.seq, retryContext.retryTimes,
+ EventMeshUtil.getMessageBizSeq(retryContext.msgExt));
+ }
+
+ public void init() {
+ dispatcher = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RetryContext retryContext = null;
+ while ((retryContext = retrys.take()) != null) {
+ final RetryContext finalRetryContext = retryContext;
+ pool.execute(() -> {
+ finalRetryContext.retry();
+ });
+ }
+ } catch (Exception e) {
+ logger.error("retry-dispatcher error!", e);
+ }
+ }
+ }, "retry-dispatcher");
+ dispatcher.setDaemon(true);
+ logger.info("EventMeshTcpRetryer inited......");
+ }
+
+ public void start() throws Exception {
+ dispatcher.start();
+ logger.info("EventMeshTcpRetryer started......");
+ }
+
+ public void shutdown() {
+ pool.shutdown();
+ logger.info("EventMeshTcpRetryer shutdown......");
+ }
+
+ public int getRetrySize() {
+ return retrys.size();
+ }
+
+ public void printRetryThreadPoolState() {
+// ThreadPoolHelper.printState(pool);
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java
new file mode 100644
index 0000000..04d8674
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
+
+import io.openmessaging.api.Message;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+public abstract class RetryContext implements Delayed {
+
+ public Message msgExt;
+
+ public String seq;
+
+ public int retryTimes = 0;
+
+ public long executeTime = System.currentTimeMillis();
+
+ public RetryContext delay(long delay) {
+ this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * delay;
+ return this;
+ }
+
+ @Override
+ public int compareTo(Delayed delayed) {
+ RetryContext obj = (RetryContext) delayed;
+ if (this.executeTime > obj.executeTime) {
+ return 1;
+ } else if (this.executeTime == obj.executeTime) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(this.executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ abstract public void retry();
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
index 6378fa2..e9e1fc5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
@@ -17,13 +17,8 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import io.openmessaging.api.Message;
import io.openmessaging.api.SendCallback;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.RRCallback;
@@ -39,6 +34,10 @@ import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
public class SessionSender {
private final Logger messageLogger = LoggerFactory.getLogger("message");
@@ -81,7 +80,7 @@ public class SessionSender {
Command cmd = header.getCommand();
if (Command.REQUEST_TO_SERVER == cmd) {
long ttl = msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null ? Long.parseLong(msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
- upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
+ upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime);
session.getClientGroupWrapper().get().request(upStreamMsgContext, initSyncRRCallback(header, startTime, taskExecuteTime), ttl);
upstreamBuff.release();
} else if (Command.RESPONSE_TO_SERVER == cmd) {
@@ -98,11 +97,11 @@ public class SessionSender {
// MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, msg.getProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));
// MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));
- upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
+ upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime);
session.getClientGroupWrapper().get().reply(upStreamMsgContext);
upstreamBuff.release();
} else {
- upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
+ upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime);
session.getClientGroupWrapper().get().send(upStreamMsgContext, sendCallback);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
index 1618ce7..48be15c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
@@ -18,26 +18,43 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;
import io.openmessaging.api.Message;
-
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext;
+import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.apache.eventmesh.runtime.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpStreamMsgContext extends RetryContext {
-public class UpStreamMsgContext {
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private Session session;
- private Message msg;
+ private long createTime = System.currentTimeMillis();
- private String seq;
+ private Header header;
- private long createTime = System.currentTimeMillis();
+ private long startTime;
+
+ private long taskExecuteTime;
- public UpStreamMsgContext(String seq, Session session, Message msg) {
- this.seq = seq;
+ public UpStreamMsgContext(Session session, Message msg, Header header, long startTime, long taskExecuteTime) {
+ this.seq = header.getSeq();
this.session = session;
- this.msg = msg;
+ this.msgExt = msg;
+ this.header = header;
+ this.startTime = startTime;
+ this.taskExecuteTime = taskExecuteTime;
}
public Session getSession() {
@@ -45,7 +62,7 @@ public class UpStreamMsgContext {
}
public Message getMsg() {
- return msg;
+ return msgExt;
}
public long getCreateTime() {
@@ -55,8 +72,84 @@ public class UpStreamMsgContext {
@Override
public String toString() {
return "UpStreamMsgContext{seq=" + seq
- + ",topic=" + msg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)
+ + ",topic=" + msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)
+ ",client=" + session.getClient()
- + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}";
+ + ",retryTimes=" + retryTimes
+ + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}"
+ + ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT);
+ }
+
+ @Override
+ public void retry() {
+ logger.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+
+ try {
+ Command replyCmd = getReplyCmd(header.getCommand());
+ long sendTime = System.currentTimeMillis();
+ EventMeshMessage eventMeshMessage = EventMeshUtil.encodeMessage(msgExt);
+ EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, msgExt,
+ createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime);
+
+ if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
+ logger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", header.getCommand(), EventMeshUtil.printMqMessage
+ (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
+ } else {
+ throw new Exception(sendStatus.getDetail());
+ }
+ } catch (Exception e) {
+ logger.error("TCP UpstreamMsg Retry error", e);
+ }
+ }
+
+ protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, EventMeshMessage eventMeshMessage) {
+ final long createTime = System.currentTimeMillis();
+ Package msg = new Package();
+
+ return new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ session.getSender().getUpstreamBuff().release();
+ logger.info("upstreamMsg message success|user={}|callback cost={}", session.getClient(),
+ String.valueOf(System.currentTimeMillis() - createTime));
+ if (replyCmd.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) || replyCmd.equals(Command
+ .ASYNC_MESSAGE_TO_SERVER_ACK)) {
+ msg.setHeader(new Header(replyCmd, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), seq));
+ msg.setBody(eventMeshMessage);
+ Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
+ }
+ }
+
+ @Override
+ public void onException(OnExceptionContext context) {
+ session.getSender().getUpstreamBuff().release();
+
+ // retry
+ UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
+ session, EventMeshUtil.decodeMessage(eventMeshMessage), header, startTime, taskExecuteTime);
+ upStreamMsgContext.delay(10000);
+ session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);
+
+ session.getSender().failMsgCount.incrementAndGet();
+ logger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf
+ (System.currentTimeMillis() - createTime), new Exception(context.getException()));
+ msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), context.getException().toString(), seq));
+ msg.setBody(eventMeshMessage);
+ Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
+ }
+
+ };
+ }
+
+ private Command getReplyCmd(Command cmd) {
+ switch (cmd) {
+ case REQUEST_TO_SERVER:
+ return Command.RESPONSE_TO_CLIENT;
+ case ASYNC_MESSAGE_TO_SERVER:
+ return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
+ case BROADCAST_MESSAGE_TO_SERVER:
+ return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
+ default:
+ return cmd;
+ }
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 87eeed0..b0c79fa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -17,34 +17,31 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
-import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
-
-import java.util.concurrent.TimeUnit;
-
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;
-
import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
+
public class MessageTransferTask extends AbstractTask {
private final Logger messageLogger = LoggerFactory.getLogger("message");
@@ -158,6 +155,13 @@ public class MessageTransferTask extends AbstractTask {
@Override
public void onException(OnExceptionContext context) {
session.getSender().getUpstreamBuff().release();
+
+ // retry
+ UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
+ session, EventMeshUtil.decodeMessage(eventMeshMessage), pkg.getHeader(), startTime, taskExecuteTime);
+ upStreamMsgContext.delay(10000);
+ session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);
+
session.getSender().failMsgCount.incrementAndGet();
messageLogger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf
(System.currentTimeMillis() - createTime), new Exception(context.getException()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org