You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/25 10:57:35 UTC

[GitHub] [incubator-inlong] baomingyu commented on a change in pull request #3357: [INLONG-3352][DataProxy] Fix DataProxy keeps trying to send messages that were sent failed

baomingyu commented on a change in pull request #3357:
URL: https://github.com/apache/incubator-inlong/pull/3357#discussion_r835163115



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
##########
@@ -723,6 +727,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
             SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
             if (!checkMessageTopic || !MessageUtils.isSyncSendForOrder(commonAttrMap
                     .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                logger.debug("response 11111!");

Review comment:
       done

##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/EventStat.java
##########
@@ -17,20 +17,29 @@
 
 package org.apache.inlong.dataproxy.sink;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.base.OrderEvent;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 
 public class EventStat {
+
+    private static long RETRY_INTERVAL_MS = 1 * 1000L;
     private Event event;
-    private int myRetryCnt;
+    private AtomicInteger myRetryCnt = new AtomicInteger(0);

Review comment:
       The retry mechanism will be redesigned later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org