You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "waitinfuture (via GitHub)" <gi...@apache.org> on 2023/02/13 08:51:26 UTC

[GitHub] [incubator-celeborn] waitinfuture commented on a diff in pull request #1208: [CELEBORN-273] Move push data timeout checker into TransportResponseHandler to keep callback status consistence

waitinfuture commented on code in PR #1208:
URL: https://github.com/apache/incubator-celeborn/pull/1208#discussion_r1104071511


##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java:
##########
@@ -156,32 +156,34 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
     return requestId;
   }
 
-  public ChannelFuture pushData(PushData pushData, RpcResponseCallback callback) {
+  public ChannelFuture pushData(
+      PushData pushData, long pushDataTimeout, RpcResponseCallback callback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
     }
 
     long requestId = requestId();
-    handler.addPushRequest(requestId, callback);
-
     pushData.requestId = requestId;
 
     PushChannelListener listener = new PushChannelListener(requestId, callback);
-    return channel.writeAndFlush(pushData).addListener(listener);
+    ChannelFuture channelFuture = channel.writeAndFlush(pushData).addListener(listener);
+    handler.addPushRequest(requestId, channelFuture, pushDataTimeout, callback);

Review Comment:
   It can happen that channelFuture completes before adding to addPushRequest, then timeout will be called on success request. We should addPushRequest before writeAndFlush, and set future after.



##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java:
##########
@@ -39,22 +48,73 @@
 public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
   private static final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
 
+  private final TransportConf conf;
   private final Channel channel;
 
   private final Map<StreamChunkSlice, ChunkReceivedCallback> outstandingFetches;
 
   private final Map<Long, RpcResponseCallback> outstandingRpcs;
-  private final Map<Long, RpcResponseCallback> outstandingPushes;
+  private final Map<Long, PushRequestInfo> outstandingPushes;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
   private final AtomicLong timeOfLastRequestNs;
 
-  public TransportResponseHandler(Channel channel) {
+  private final ScheduledExecutorService pushTimeoutChecker;
+  private final long pushTimeoutCheckerInterval;
+
+  public TransportResponseHandler(TransportConf conf, Channel channel) {
+    this.conf = conf;
     this.channel = channel;
     this.outstandingFetches = new ConcurrentHashMap<>();
     this.outstandingRpcs = new ConcurrentHashMap<>();
     this.outstandingPushes = new ConcurrentHashMap<>();
     this.timeOfLastRequestNs = new AtomicLong(0);
+    pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+    pushTimeoutChecker =
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("push-timeout-checker-" + this);
+    pushTimeoutChecker.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            failExpiredPushRequest();
+          }
+        },
+        pushTimeoutCheckerInterval,
+        pushTimeoutCheckerInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void failExpiredPushRequest() {
+    long currentTime = System.currentTimeMillis();
+    for (Map.Entry<Long, PushRequestInfo> entry : outstandingPushes.entrySet()) {
+      try {
+        long requestId = entry.getKey();
+        PushRequestInfo info = entry.getValue();
+        if (info.pushDataTimeout > 0) {
+          if (info.pushTime != -1 && (currentTime - info.pushTime > info.pushDataTimeout)) {
+            if (info.callback != null) {
+              if (!info.channelFuture.isCancelled()) {
+                info.channelFuture.cancel(true);
+                // When module name equals to DATA_MODULE, mean shuffle client push data, else means
+                // do data replication.
+                if (Objects.equals(conf.getModuleName(), TransportModuleConstants.DATA_MODULE)) {
+                  info.callback.onFailure(
+                      new IOException(StatusCode.PUSH_DATA_TIMEOUT_MASTER.getMessage()));
+                } else {

Review Comment:
   else if (Objects.equals(conf.getModuleName(), TransportModuleConstants.PUSH_MODULE))



##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java:
##########
@@ -39,22 +48,73 @@
 public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
   private static final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
 
+  private final TransportConf conf;
   private final Channel channel;
 
   private final Map<StreamChunkSlice, ChunkReceivedCallback> outstandingFetches;
 
   private final Map<Long, RpcResponseCallback> outstandingRpcs;
-  private final Map<Long, RpcResponseCallback> outstandingPushes;
+  private final Map<Long, PushRequestInfo> outstandingPushes;
 
   /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
   private final AtomicLong timeOfLastRequestNs;
 
-  public TransportResponseHandler(Channel channel) {
+  private final ScheduledExecutorService pushTimeoutChecker;
+  private final long pushTimeoutCheckerInterval;
+
+  public TransportResponseHandler(TransportConf conf, Channel channel) {
+    this.conf = conf;
     this.channel = channel;
     this.outstandingFetches = new ConcurrentHashMap<>();
     this.outstandingRpcs = new ConcurrentHashMap<>();
     this.outstandingPushes = new ConcurrentHashMap<>();
     this.timeOfLastRequestNs = new AtomicLong(0);
+    pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+    pushTimeoutChecker =
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("push-timeout-checker-" + this);
+    pushTimeoutChecker.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            failExpiredPushRequest();
+          }
+        },
+        pushTimeoutCheckerInterval,
+        pushTimeoutCheckerInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void failExpiredPushRequest() {
+    long currentTime = System.currentTimeMillis();
+    for (Map.Entry<Long, PushRequestInfo> entry : outstandingPushes.entrySet()) {
+      try {
+        long requestId = entry.getKey();
+        PushRequestInfo info = entry.getValue();
+        if (info.pushDataTimeout > 0) {
+          if (info.pushTime != -1 && (currentTime - info.pushTime > info.pushDataTimeout)) {
+            if (info.callback != null) {

Review Comment:
   There can be concurrent issues here. First we should use entry iterator instead of ```entry : outstandingPushes.entrySet```; Second we should remove from the outstandingPushes here; Third we check the return value of remove, if not null, do the other things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org