You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "AngersZhuuuu (via GitHub)" <gi...@apache.org> on 2023/02/15 03:34:48 UTC

[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1208: [CELEBORN-273] Move push data timeout checker into TransportResponseHandler to keep callback status consistence

AngersZhuuuu commented on code in PR #1208:
URL: https://github.com/apache/incubator-celeborn/pull/1208#discussion_r1106611338


##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java:
##########
@@ -39,22 +47,62 @@
 public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
   private static final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
 
+  private final TransportConf conf;
   private final Channel channel;
 
   private final Map<StreamChunkSlice, ChunkReceivedCallback> outstandingFetches;
 
   private final Map<Long, RpcResponseCallback> outstandingRpcs;
-  private final Map<Long, RpcResponseCallback> outstandingPushes;
+  private final ConcurrentHashMap<Long, PushRequestInfo> outstandingPushes;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
   private final AtomicLong timeOfLastRequestNs;
 
-  public TransportResponseHandler(Channel channel) {
+  private final ScheduledExecutorService pushTimeoutChecker;
+  private final long pushTimeoutCheckerInterval;
+
+  public TransportResponseHandler(TransportConf conf, Channel channel) {
+    this.conf = conf;
     this.channel = channel;
     this.outstandingFetches = new ConcurrentHashMap<>();
     this.outstandingRpcs = new ConcurrentHashMap<>();
     this.outstandingPushes = new ConcurrentHashMap<>();
     this.timeOfLastRequestNs = new AtomicLong(0);
+    pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+    pushTimeoutChecker =
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("push-timeout-checker-" + this);
+    pushTimeoutChecker.scheduleAtFixedRate(
+        () -> failExpiredPushRequest(),
+        pushTimeoutCheckerInterval,
+        pushTimeoutCheckerInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void failExpiredPushRequest() {
+    long currentTime = System.currentTimeMillis();
+    Iterator<Map.Entry<Long, PushRequestInfo>> iter = outstandingPushes.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Long, PushRequestInfo> entry = iter.next();
+      if (entry.getValue().dueTime <= currentTime) {

Review Comment:
   After fix the in correct logic of this condition, test case worked as expected.



##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java:
##########
@@ -39,22 +47,62 @@
 public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
   private static final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
 
+  private final TransportConf conf;
   private final Channel channel;
 
   private final Map<StreamChunkSlice, ChunkReceivedCallback> outstandingFetches;
 
   private final Map<Long, RpcResponseCallback> outstandingRpcs;
-  private final Map<Long, RpcResponseCallback> outstandingPushes;
+  private final ConcurrentHashMap<Long, PushRequestInfo> outstandingPushes;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
   private final AtomicLong timeOfLastRequestNs;
 
-  public TransportResponseHandler(Channel channel) {
+  private final ScheduledExecutorService pushTimeoutChecker;
+  private final long pushTimeoutCheckerInterval;
+
+  public TransportResponseHandler(TransportConf conf, Channel channel) {
+    this.conf = conf;
     this.channel = channel;
     this.outstandingFetches = new ConcurrentHashMap<>();
     this.outstandingRpcs = new ConcurrentHashMap<>();
     this.outstandingPushes = new ConcurrentHashMap<>();
     this.timeOfLastRequestNs = new AtomicLong(0);
+    pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+    pushTimeoutChecker =
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("push-timeout-checker-" + this);
+    pushTimeoutChecker.scheduleAtFixedRate(
+        () -> failExpiredPushRequest(),
+        pushTimeoutCheckerInterval,
+        pushTimeoutCheckerInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void failExpiredPushRequest() {
+    long currentTime = System.currentTimeMillis();
+    Iterator<Map.Entry<Long, PushRequestInfo>> iter = outstandingPushes.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Long, PushRequestInfo> entry = iter.next();
+      if (entry.getValue().dueTime <= currentTime) {

Review Comment:
   After fix the in correct logic of this condition, test case worked as expected. cc @waitinfuture 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org