You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/09/30 09:28:54 UTC

[incubator-eventmesh] branch develop updated: [Issue #528] enhance Retry support for HTTP and TCP processors. (#529)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new b9abbcd  [Issue #528] enhance Retry support for HTTP and TCP processors. (#529)
b9abbcd is described below

commit b9abbcd1051e4c7a200788faa658c3e8a84f2466
Author: jinrongluo <ka...@gmail.com>
AuthorDate: Thu Sep 30 05:28:50 2021 -0400

    [Issue #528] enhance Retry support for HTTP and TCP processors. (#529)
    
    * [Issue #337] Fix HttpSubscriber startup issue
    
    * [Issue #337] test commit
    
    * [Issue #337] revert test commit
    
    * [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
    
    * [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
    
    * [Issue #337] Address code review comment for Subscriber Demo App
    
    * [Issue #528] enhance Retry support for HTTP and TCP processors.
    
    * [Issue #528] fixing checkstyle issue
    
    * [Issue #528] Fix taskExecuteTime in TCP ClientGroupWrapper UpStreamMsgContext
    
    Co-authored-by: j00441484 <ji...@huawei.com>
---
 .../eventmesh/runtime/boot/EventMeshTCPServer.java |   2 +-
 .../http/processor/SendAsyncMessageProcessor.java  |   4 +
 .../http/processor/SendSyncMessageProcessor.java   |   4 +
 .../tcp/client/group/ClientGroupWrapper.java       |  32 ++-
 .../client/session/push/DownStreamMsgContext.java  | 101 +++++++---
 .../session/push/retry/EventMeshTcpRetryer.java    | 217 ---------------------
 .../client/session/retry/EventMeshTcpRetryer.java  | 127 ++++++++++++
 .../tcp/client/session/retry/RetryContext.java     |  57 ++++++
 .../tcp/client/session/send/SessionSender.java     |  15 +-
 .../client/session/send/UpStreamMsgContext.java    | 115 +++++++++--
 .../tcp/client/task/MessageTransferTask.java       |  22 ++-
 11 files changed, 397 insertions(+), 299 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index 4620ce7..6cbb130 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -44,7 +44,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcpMessage
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceService;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventmeshRebalanceImpl;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 import org.apache.eventmesh.runtime.registry.Registry;
 import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index d64ef6b..3649dd0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -245,6 +245,8 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
                             SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
                                     EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(context.getException(), 2)));
                     asyncContext.onComplete(err, handler);
+
+                    eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
                     long endTime = System.currentTimeMillis();
                     eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
                     eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
@@ -261,6 +263,8 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
                     SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
                             EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
             asyncContext.onComplete(err);
+
+            eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
             long endTime = System.currentTimeMillis();
             messageLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                     endTime - startTime,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 5aa623d..38f3f25 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -254,6 +254,8 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                             SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
                                     EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
                     asyncContext.onComplete(err, handler);
+
+                    eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
                     messageLogger.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                             System.currentTimeMillis() - startTime,
                             sendMessageRequestBody.getTopic(),
@@ -267,6 +269,8 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                     SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
                             EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
             asyncContext.onComplete(err);
+
+            eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
             long endTime = System.currentTimeMillis();
             eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
             eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 311d6eb..87f7b9b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -17,26 +17,8 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import com.alibaba.fastjson.JSON;
-
-import io.openmessaging.api.AsyncConsumeContext;
-import io.openmessaging.api.AsyncMessageListener;
-import io.openmessaging.api.Message;
-import io.openmessaging.api.OnExceptionContext;
-import io.openmessaging.api.SendCallback;
-import io.openmessaging.api.SendResult;
-
+import io.openmessaging.api.*;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.api.EventMeshAction;
@@ -53,7 +35,7 @@ import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
@@ -61,6 +43,12 @@ import org.apache.eventmesh.runtime.util.HttpTinyClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class ClientGroupWrapper {
 
     public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
@@ -752,7 +740,9 @@ public class ClientGroupWrapper {
             String topic = msg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
             logger.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
 
-            send(new UpStreamMsgContext(null, null, msg), new SendCallback() {
+            long startTime = System.currentTimeMillis();
+            long taskExcuteTime = startTime;
+            send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime), new SendCallback() {
                 @Override
                 public void onSuccess(SendResult sendResult) {
                     logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 712e6fa..5b86ba3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -17,45 +17,36 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
 import io.openmessaging.api.Message;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext;
+import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.ServerGlobal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
 
-public class DownStreamMsgContext implements Delayed {
+public class DownStreamMsgContext extends RetryContext {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    public String seq;
-
-    public Message msgExt;
-
     public Session session;
 
     public AbstractContext consumeConcurrentlyContext;
 
     public MQConsumerWrapper consumer;
 
-    public int retryTimes;
-
     public SubscriptionItem subscriptionItem;
 
-    private long executeTime;
-
     public long lastPushTime;
 
     private long createTime;
@@ -68,11 +59,9 @@ public class DownStreamMsgContext implements Delayed {
         this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet());
         this.msgExt = msgExt;
         this.session = session;
-        this.retryTimes = 0;
         this.consumer = consumer;
         this.consumeConcurrentlyContext = consumeConcurrentlyContext;
         this.lastPushTime = System.currentTimeMillis();
-        this.executeTime = System.currentTimeMillis();
         this.createTime = System.currentTimeMillis();
         this.subscriptionItem = subscriptionItem;
         String ttlStr = msgExt.getUserProperties("TTL");
@@ -99,10 +88,6 @@ public class DownStreamMsgContext implements Delayed {
         }
     }
 
-    public void delay(long delay) {
-        this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * delay;
-    }
-
     @Override
     public String toString() {
         return "DownStreamMsgContext{" +
@@ -119,19 +104,71 @@ public class DownStreamMsgContext implements Delayed {
     }
 
     @Override
-    public int compareTo(Delayed delayed) {
-        DownStreamMsgContext context = (DownStreamMsgContext) delayed;
-        if (this.executeTime > context.executeTime) {
-            return 1;
-        } else if (this.executeTime == context.executeTime) {
-            return 0;
-        } else {
-            return -1;
+    public void retry() {
+        try {
+            logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+
+            if (isRetryMsgTimeout(this)) {
+                return;
+            }
+            this.retryTimes++;
+            this.lastPushTime = System.currentTimeMillis();
+
+            Session rechoosen = null;
+            String topic = this.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+            if (!SubscriptionMode.BROADCASTING.equals(this.subscriptionItem.getMode())) {
+                rechoosen = this.session.getClientGroupWrapper()
+                        .get().getDownstreamDispatchStrategy().select(this.session.getClientGroupWrapper().get().getSysId()
+                                , topic
+                                , this.session.getClientGroupWrapper().get().getGroupConsumerSessions());
+            } else {
+                rechoosen = this.session;
+            }
+
+            if (rechoosen == null) {
+                logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+            } else {
+                this.session = rechoosen;
+                rechoosen.downstreamMsg(this);
+                logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+            }
+        } catch (Exception e) {
+            logger.error("retry-dispatcher error!", e);
         }
     }
 
-    @Override
-    public long getDelay(TimeUnit unit) {
-        return unit.convert(this.executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
+        boolean flag = false;
+        String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL);
+        long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;;
+
+        String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME);
+        long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0;
+        String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME);
+        long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;
+
+        String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME);
+        long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0;
+        double elapseTime = brokerCost + accessCost;
+        if (elapseTime >= ttl) {
+            logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
+            flag = true;
+            eventMeshAckMsg(downStreamMsgContext);
+        }
+        return flag;
+    }
+
+    /**
+     * eventMesh ack msg
+     *
+     * @param downStreamMsgContext
+     */
+    private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
+        List<Message> msgExts = new ArrayList<Message>();
+        msgExts.add(downStreamMsgContext.msgExt);
+        logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION),
+                downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
+        downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
     }
+
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
deleted file mode 100644
index cf4e3b7..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry;
-
-import io.openmessaging.api.Message;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.common.protocol.SubscriptionType;
-import org.apache.eventmesh.common.protocol.SubscriptionMode;
-import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class EventMeshTcpRetryer {
-
-    public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class);
-
-    private EventMeshTCPServer eventMeshTCPServer;
-
-    private DelayQueue<DownStreamMsgContext> retrys = new DelayQueue<DownStreamMsgContext>();
-
-    private ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
-            3,
-            60000,
-            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
-            new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true),
-            new ThreadPoolExecutor.AbortPolicy());
-
-    private Thread dispatcher;
-
-    public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
-        this.eventMeshTCPServer = eventMeshTCPServer;
-    }
-
-    public EventMeshTCPServer getEventMeshTCPServer() {
-        return eventMeshTCPServer;
-    }
-
-    public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
-        this.eventMeshTCPServer = eventMeshTCPServer;
-    }
-
-    public void pushRetry(DownStreamMsgContext downStreamMsgContext) {
-        if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
-            logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
-                    eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, downStreamMsgContext.retryTimes,
-                    downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-            return;
-        }
-
-        int maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())
-                ? eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes
-                : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
-        if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
-            logger.warn("pushRetry fail,retry over maxRetryTimes:{}, pushType: {}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, downStreamMsgContext.subscriptionItem.getType(),
-                    downStreamMsgContext.retryTimes, downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-            return;
-        }
-
-        retrys.offer(downStreamMsgContext);
-        logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes,
-                EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-    }
-
-    public void init() {
-        dispatcher = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    DownStreamMsgContext downStreamMsgContext = null;
-                    while ((downStreamMsgContext = retrys.take()) != null) {
-                        final DownStreamMsgContext finalDownStreamMsgContext = downStreamMsgContext;
-                        pool.execute(() -> {
-                            retryHandle(finalDownStreamMsgContext);
-                        });
-                    }
-                } catch (Exception e) {
-                    logger.error("retry-dispatcher error!", e);
-                }
-            }
-        }, "retry-dispatcher");
-        dispatcher.setDaemon(true);
-        logger.info("EventMeshTcpRetryer inited......");
-    }
-
-    private void retryHandle(DownStreamMsgContext downStreamMsgContext) {
-        try {
-            logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq,
-                    downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-
-            if (isRetryMsgTimeout(downStreamMsgContext)) {
-                return;
-            }
-            downStreamMsgContext.retryTimes++;
-            downStreamMsgContext.lastPushTime = System.currentTimeMillis();
-
-            Session rechoosen = null;
-            String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-            if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
-                rechoosen = downStreamMsgContext.session.getClientGroupWrapper()
-                        .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getSysId()
-                                , topic
-                                , downStreamMsgContext.session.getClientGroupWrapper().get().getGroupConsumerSessions());
-            } else {
-                rechoosen = downStreamMsgContext.session;
-            }
-
-            if (rechoosen == null) {
-                logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
-                        downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-
-//                //Need to manually ack the message that did not send a successful message
-//                eventMeshAckMsg(downStreamMsgContext);
-
-//                //Retry cannot find the delivered session, no longer post back to the broker or retry other event Mesh
-//                String bizSeqNo = finalDownStreamMsgContext.msgExt.getKeys();
-//                String uniqueId = MapUtils.getString(finalDownStreamMsgContext.msgExt.getProperties(), WeMQConstant.RMB_UNIQ_ID, "");
-//                if(EventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackEnabled){
-//                    sendMsgBackToBroker(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId);
-//                }else{
-//                    // TODO: Push the message to other EventMesh instances. To be determined.
-//                    sendMsgToOtherEventMesh(finalDownStreamMsgContext.msgExt, bizSeqNo, uniqueId);
-//                }
-            } else {
-                downStreamMsgContext.session = rechoosen;
-                rechoosen.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
-                rechoosen.downstreamMsg(downStreamMsgContext);
-                logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", downStreamMsgContext.seq,
-                        downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-            }
-        } catch (Exception e) {
-            logger.error("retry-dispatcher error!", e);
-        }
-    }
-
-    private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
-        boolean flag = false;
-        String ttlStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL);
-        long ttl = StringUtils.isNumeric(ttlStr)? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;;
-
-        String storeTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME);
-        long storeTimestamp = StringUtils.isNumeric(storeTimeStr)? Long.parseLong(storeTimeStr) : 0;
-        String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME);
-        long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;
-
-        String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME);
-        long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0;
-        double elapseTime = brokerCost + accessCost;
-        if (elapseTime >= ttl) {
-            logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
-                    downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-            flag = true;
-            eventMeshAckMsg(downStreamMsgContext);
-        }
-        return flag;
-    }
-
-    public void start() throws Exception {
-        dispatcher.start();
-        logger.info("EventMeshTcpRetryer started......");
-    }
-
-    public void shutdown() {
-        pool.shutdown();
-        logger.info("EventMeshTcpRetryer shutdown......");
-    }
-
-    public int getRetrySize() {
-        return retrys.size();
-    }
-
-    /**
-     * eventMesh ack msg
-     *
-     * @param downStreamMsgContext
-     */
-    private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
-        List<Message> msgExts = new ArrayList<Message>();
-        msgExts.add(downStreamMsgContext.msgExt);
-        logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION),
-                downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
-        downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
-//        ConsumeMessageService consumeMessageService = downStreamMsgContext.consumer.getDefaultMQPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService();
-//        ((ConsumeMessageConcurrentlyService)consumeMessageService).updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
-    }
-
-    public void printRetryThreadPoolState() {
-//        ThreadPoolHelper.printState(pool);
-    }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
new file mode 100644
index 0000000..c43ef4f
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
+
+import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
+import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
+import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class EventMeshTcpRetryer {
+
+    public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class);
+
+    private EventMeshTCPServer eventMeshTCPServer;
+
+    private DelayQueue<RetryContext> retrys = new DelayQueue<RetryContext>();
+
+    private ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
+            3,
+            60000,
+            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
+            new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true),
+            new ThreadPoolExecutor.AbortPolicy());
+
+    private Thread dispatcher;
+
+    public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    public EventMeshTCPServer getEventMeshTCPServer() {
+        return eventMeshTCPServer;
+    }
+
+    public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    public void pushRetry(RetryContext retryContext) {
+        if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
+            logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
+                    eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize, retryContext.retryTimes,
+                    retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.msgExt));
+            return;
+        }
+
+        int maxRetryTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+        if (retryContext instanceof DownStreamMsgContext) {
+            DownStreamMsgContext downStreamMsgContext = (DownStreamMsgContext) retryContext;
+            maxRetryTimes = SubscriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ?
+                    eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgSyncRetryTimes :
+                    eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgAsyncRetryTimes;
+        }
+
+        if (retryContext.retryTimes >= maxRetryTimes) {
+            logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes,
+                    retryContext.retryTimes, retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.msgExt));
+            return;
+        }
+
+        retrys.offer(retryContext);
+        logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", retryContext.seq, retryContext.retryTimes,
+                EventMeshUtil.getMessageBizSeq(retryContext.msgExt));
+    }
+
+    public void init() {
+        dispatcher = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    RetryContext retryContext = null;
+                    while ((retryContext = retrys.take()) != null) {
+                        final RetryContext finalRetryContext = retryContext;
+                        pool.execute(() -> {
+                            finalRetryContext.retry();
+                        });
+                    }
+                } catch (Exception e) {
+                    logger.error("retry-dispatcher error!", e);
+                }
+            }
+        }, "retry-dispatcher");
+        dispatcher.setDaemon(true);
+        logger.info("EventMeshTcpRetryer inited......");
+    }
+
+    public void start() throws Exception {
+        dispatcher.start();
+        logger.info("EventMeshTcpRetryer started......");
+    }
+
+    public void shutdown() {
+        pool.shutdown();
+        logger.info("EventMeshTcpRetryer shutdown......");
+    }
+
+    public int getRetrySize() {
+        return retrys.size();
+    }
+
+    public void printRetryThreadPoolState() {
+//        ThreadPoolHelper.printState(pool);
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java
new file mode 100644
index 0000000..04d8674
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/RetryContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
+
+import io.openmessaging.api.Message;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+public abstract class RetryContext implements Delayed {
+
+    public Message msgExt;
+
+    public String seq;
+
+    public int retryTimes = 0;
+
+    public long executeTime = System.currentTimeMillis();
+
+    public RetryContext delay(long delay) {
+        this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * delay;
+        return this;
+    }
+
+    @Override
+    public int compareTo(Delayed delayed) {
+        RetryContext obj = (RetryContext) delayed;
+        if (this.executeTime > obj.executeTime) {
+            return 1;
+        } else if (this.executeTime == obj.executeTime) {
+            return 0;
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return unit.convert(this.executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    abstract public void retry();
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
index 6378fa2..e9e1fc5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.java
@@ -17,13 +17,8 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;
 
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import io.openmessaging.api.Message;
 import io.openmessaging.api.SendCallback;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.api.RRCallback;
@@ -39,6 +34,10 @@ import org.apache.eventmesh.runtime.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class SessionSender {
 
     private final Logger messageLogger = LoggerFactory.getLogger("message");
@@ -81,7 +80,7 @@ public class SessionSender {
                 Command cmd = header.getCommand();
                 if (Command.REQUEST_TO_SERVER == cmd) {
                     long ttl = msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null ? Long.parseLong(msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
-                    upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
+                    upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime);
                     session.getClientGroupWrapper().get().request(upStreamMsgContext, initSyncRRCallback(header, startTime, taskExecuteTime), ttl);
                     upstreamBuff.release();
                 } else if (Command.RESPONSE_TO_SERVER == cmd) {
@@ -98,11 +97,11 @@ public class SessionSender {
 //                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, msg.getProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));
 //                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));
 
-                    upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
+                    upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime);
                     session.getClientGroupWrapper().get().reply(upStreamMsgContext);
                     upstreamBuff.release();
                 } else {
-                    upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
+                    upStreamMsgContext = new UpStreamMsgContext(session, msg, header, startTime, taskExecuteTime);
                     session.getClientGroupWrapper().get().send(upStreamMsgContext, sendCallback);
                 }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
index 1618ce7..48be15c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
@@ -18,26 +18,43 @@
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;
 
 import io.openmessaging.api.Message;
-
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.RetryContext;
+import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.apache.eventmesh.runtime.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpStreamMsgContext extends RetryContext {
 
-public class UpStreamMsgContext {
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     private Session session;
 
-    private Message msg;
+    private long createTime = System.currentTimeMillis();
 
-    private String seq;
+    private Header header;
 
-    private long createTime = System.currentTimeMillis();
+    private long startTime;
+
+    private long taskExecuteTime;
 
-    public UpStreamMsgContext(String seq, Session session, Message msg) {
-        this.seq = seq;
+    public UpStreamMsgContext(Session session, Message msg, Header header, long startTime, long taskExecuteTime) {
+        this.seq = header.getSeq();
         this.session = session;
-        this.msg = msg;
+        this.msgExt = msg;
+        this.header = header;
+        this.startTime = startTime;
+        this.taskExecuteTime = taskExecuteTime;
     }
 
     public Session getSession() {
@@ -45,7 +62,7 @@ public class UpStreamMsgContext {
     }
 
     public Message getMsg() {
-        return msg;
+        return msgExt;
     }
 
     public long getCreateTime() {
@@ -55,8 +72,84 @@ public class UpStreamMsgContext {
     @Override
     public String toString() {
         return "UpStreamMsgContext{seq=" + seq
-                + ",topic=" + msg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)
+                + ",topic=" + msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)
                 + ",client=" + session.getClient()
-                + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}";
+                + ",retryTimes=" + retryTimes
+                + ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}"
+                + ",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT);
+    }
+
+    @Override
+    public void retry() {
+        logger.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes, EventMeshUtil.getMessageBizSeq(this.msgExt));
+
+        try {
+            Command replyCmd = getReplyCmd(header.getCommand());
+            long sendTime = System.currentTimeMillis();
+            EventMeshMessage eventMeshMessage = EventMeshUtil.encodeMessage(msgExt);
+            EventMeshTcpSendResult sendStatus =  session.upstreamMsg(header, msgExt,
+                    createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime);
+
+            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
+                logger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", header.getCommand(), EventMeshUtil.printMqMessage
+                        (eventMeshMessage), session.getClient(), taskExecuteTime - startTime, sendTime - startTime);
+            } else {
+                throw new Exception(sendStatus.getDetail());
+            }
+        } catch (Exception e) {
+            logger.error("TCP UpstreamMsg Retry error", e);
+        }
+    }
+
+    protected SendCallback createSendCallback(Command replyCmd, long taskExecuteTime, EventMeshMessage eventMeshMessage) {
+        final long createTime = System.currentTimeMillis();
+        Package msg = new Package();
+
+        return new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                session.getSender().getUpstreamBuff().release();
+                logger.info("upstreamMsg message success|user={}|callback cost={}", session.getClient(),
+                        String.valueOf(System.currentTimeMillis() - createTime));
+                if (replyCmd.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) || replyCmd.equals(Command
+                        .ASYNC_MESSAGE_TO_SERVER_ACK)) {
+                    msg.setHeader(new Header(replyCmd, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), seq));
+                    msg.setBody(eventMeshMessage);
+                    Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
+                }
+            }
+
+            @Override
+            public void onException(OnExceptionContext context) {
+                session.getSender().getUpstreamBuff().release();
+
+                // retry
+                UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
+                        session, EventMeshUtil.decodeMessage(eventMeshMessage), header, startTime, taskExecuteTime);
+                upStreamMsgContext.delay(10000);
+                session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);
+
+                session.getSender().failMsgCount.incrementAndGet();
+                logger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf
+                        (System.currentTimeMillis() - createTime), new Exception(context.getException()));
+                msg.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode(), context.getException().toString(), seq));
+                msg.setBody(eventMeshMessage);
+                Utils.writeAndFlush(msg, startTime, taskExecuteTime, session.getContext(), session);
+            }
+
+        };
+    }
+
+    private Command getReplyCmd(Command cmd) {
+        switch (cmd) {
+            case REQUEST_TO_SERVER:
+                return Command.RESPONSE_TO_CLIENT;
+            case ASYNC_MESSAGE_TO_SERVER:
+                return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
+            case BROADCAST_MESSAGE_TO_SERVER:
+                return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
+            default:
+                return cmd;
+        }
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 87eeed0..b0c79fa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -17,34 +17,31 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
 
-import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
-
-import java.util.concurrent.TimeUnit;
-
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.openmessaging.api.OnExceptionContext;
 import io.openmessaging.api.SendCallback;
 import io.openmessaging.api.SendResult;
-
 import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.tcp.*;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER;
+
 public class MessageTransferTask extends AbstractTask {
 
     private final Logger messageLogger = LoggerFactory.getLogger("message");
@@ -158,6 +155,13 @@ public class MessageTransferTask extends AbstractTask {
             @Override
             public void onException(OnExceptionContext context) {
                 session.getSender().getUpstreamBuff().release();
+
+                // retry
+                UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
+                        session, EventMeshUtil.decodeMessage(eventMeshMessage), pkg.getHeader(), startTime, taskExecuteTime);
+                upStreamMsgContext.delay(10000);
+                session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);
+
                 session.getSender().failMsgCount.incrementAndGet();
                 messageLogger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", session.getClient(), String.valueOf
                         (System.currentTimeMillis() - createTime), new Exception(context.getException()));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org