You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/09/28 02:43:17 UTC

[incubator-celeborn] branch main updated: [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ef4fc51d5 [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
ef4fc51d5 is described below

commit ef4fc51d5f2d9d1a707e06cac2c64f3a07082e42
Author: onebox-li <ly...@163.com>
AuthorDate: Thu Sep 28 10:43:08 2023 +0800

    [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
    
    ### What changes were proposed in this pull request?
    Only push/data module needs push-timeout-checker, and data module needs fetch-timeout-checker.
    Here make push-timeout-checker not to be created in Master/LifeCycleManager, and fetch-timeout-checker in Worker.
    The same goes for related timeout checker schedule tasks.
    
    ### Why are the changes needed?
    Ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Cluster test
    
    Closes #1940 from onebox-li/checker-dev.
    
    Authored-by: onebox-li <ly...@163.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 .../network/client/TransportResponseHandler.java   | 76 +++++++++++++++-------
 1 file changed, 51 insertions(+), 25 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 7282fa913..78120a2c1 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -79,31 +79,49 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
     this.timeOfLastRequestNs = new AtomicLong(0);
     this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
     this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+
+    String module = conf.getModuleName();
+    boolean checkPushTimeout = false;
+    boolean checkFetchTimeout = false;
+    if (TransportModuleConstants.DATA_MODULE.equals(module)) {
+      checkPushTimeout = true;
+      checkFetchTimeout = true;
+    } else if (TransportModuleConstants.PUSH_MODULE.equals(module)) {
+      checkPushTimeout = true;
+    }
     synchronized (TransportResponseHandler.class) {
-      if (pushTimeoutChecker == null) {
-        pushTimeoutChecker =
-            ThreadUtils.newDaemonThreadPoolScheduledExecutor(
-                "push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
-      }
-      if (fetchTimeoutChecker == null) {
-        fetchTimeoutChecker =
-            ThreadUtils.newDaemonThreadPoolScheduledExecutor(
-                "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
+      if (checkPushTimeout) {
+        if (pushTimeoutChecker == null) {
+          pushTimeoutChecker =
+              ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+                  "push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
+        }
+        if (checkFetchTimeout) {
+          if (fetchTimeoutChecker == null) {
+            fetchTimeoutChecker =
+                ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+                    "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
+          }
+        }
       }
     }
-    pushCheckerScheduleFuture =
-        pushTimeoutChecker.scheduleAtFixedRate(
-            () -> failExpiredPushRequest(),
-            pushTimeoutCheckerInterval,
-            pushTimeoutCheckerInterval,
-            TimeUnit.MILLISECONDS);
+    if (checkPushTimeout) {
+      pushCheckerScheduleFuture =
+          pushTimeoutChecker.scheduleAtFixedRate(
+              () -> failExpiredPushRequest(),
+              pushTimeoutCheckerInterval,
+              pushTimeoutCheckerInterval,
+              TimeUnit.MILLISECONDS);
+    }
 
-    fetchCheckerScheduleFuture =
-        fetchTimeoutChecker.scheduleAtFixedRate(
-            () -> failExpiredFetchRequest(),
-            fetchTimeoutCheckerInterval,
-            fetchTimeoutCheckerInterval,
-            TimeUnit.MILLISECONDS);
+    if (checkFetchTimeout) {
+      fetchCheckerScheduleFuture =
+          fetchTimeoutChecker.scheduleAtFixedRate(
+              () -> failExpiredFetchRequest(),
+              fetchTimeoutCheckerInterval,
+              fetchTimeoutCheckerInterval,
+              TimeUnit.MILLISECONDS);
+    }
   }
 
   public void failExpiredPushRequest() {
@@ -251,8 +269,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
           remoteAddress);
       failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
     }
-    pushCheckerScheduleFuture.cancel(false);
-    fetchCheckerScheduleFuture.cancel(false);
+    if (pushCheckerScheduleFuture != null) {
+      pushCheckerScheduleFuture.cancel(false);
+    }
+    if (fetchCheckerScheduleFuture != null) {
+      fetchCheckerScheduleFuture.cancel(false);
+    }
   }
 
   @Override
@@ -265,8 +287,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
           remoteAddress);
       failOutstandingRequests(cause);
     }
-    pushCheckerScheduleFuture.cancel(false);
-    fetchCheckerScheduleFuture.cancel(false);
+    if (pushCheckerScheduleFuture != null) {
+      pushCheckerScheduleFuture.cancel(false);
+    }
+    if (fetchCheckerScheduleFuture != null) {
+      fetchCheckerScheduleFuture.cancel(false);
+    }
   }
 
   @Override