You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/09/28 02:43:17 UTC
[incubator-celeborn] branch main updated: [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ef4fc51d5 [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
ef4fc51d5 is described below
commit ef4fc51d5f2d9d1a707e06cac2c64f3a07082e42
Author: onebox-li <ly...@163.com>
AuthorDate: Thu Sep 28 10:43:08 2023 +0800
[CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
### What changes were proposed in this pull request?
Only push/data module needs push-timeout-checker, and data module needs fetch-timeout-checker.
Here make push-timeout-checker not to be created in Master/LifeCycleManager, and fetch-timeout-checker in Worker.
The same goes for related timeout checker schedule tasks.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test
Closes #1940 from onebox-li/checker-dev.
Authored-by: onebox-li <ly...@163.com>
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
.../network/client/TransportResponseHandler.java | 76 +++++++++++++++-------
1 file changed, 51 insertions(+), 25 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 7282fa913..78120a2c1 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -79,31 +79,49 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
this.timeOfLastRequestNs = new AtomicLong(0);
this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+
+ String module = conf.getModuleName();
+ boolean checkPushTimeout = false;
+ boolean checkFetchTimeout = false;
+ if (TransportModuleConstants.DATA_MODULE.equals(module)) {
+ checkPushTimeout = true;
+ checkFetchTimeout = true;
+ } else if (TransportModuleConstants.PUSH_MODULE.equals(module)) {
+ checkPushTimeout = true;
+ }
synchronized (TransportResponseHandler.class) {
- if (pushTimeoutChecker == null) {
- pushTimeoutChecker =
- ThreadUtils.newDaemonThreadPoolScheduledExecutor(
- "push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
- }
- if (fetchTimeoutChecker == null) {
- fetchTimeoutChecker =
- ThreadUtils.newDaemonThreadPoolScheduledExecutor(
- "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
+ if (checkPushTimeout) {
+ if (pushTimeoutChecker == null) {
+ pushTimeoutChecker =
+ ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+ "push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
+ }
+ if (checkFetchTimeout) {
+ if (fetchTimeoutChecker == null) {
+ fetchTimeoutChecker =
+ ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+ "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
+ }
+ }
}
}
- pushCheckerScheduleFuture =
- pushTimeoutChecker.scheduleAtFixedRate(
- () -> failExpiredPushRequest(),
- pushTimeoutCheckerInterval,
- pushTimeoutCheckerInterval,
- TimeUnit.MILLISECONDS);
+ if (checkPushTimeout) {
+ pushCheckerScheduleFuture =
+ pushTimeoutChecker.scheduleAtFixedRate(
+ () -> failExpiredPushRequest(),
+ pushTimeoutCheckerInterval,
+ pushTimeoutCheckerInterval,
+ TimeUnit.MILLISECONDS);
+ }
- fetchCheckerScheduleFuture =
- fetchTimeoutChecker.scheduleAtFixedRate(
- () -> failExpiredFetchRequest(),
- fetchTimeoutCheckerInterval,
- fetchTimeoutCheckerInterval,
- TimeUnit.MILLISECONDS);
+ if (checkFetchTimeout) {
+ fetchCheckerScheduleFuture =
+ fetchTimeoutChecker.scheduleAtFixedRate(
+ () -> failExpiredFetchRequest(),
+ fetchTimeoutCheckerInterval,
+ fetchTimeoutCheckerInterval,
+ TimeUnit.MILLISECONDS);
+ }
}
public void failExpiredPushRequest() {
@@ -251,8 +269,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
remoteAddress);
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
}
- pushCheckerScheduleFuture.cancel(false);
- fetchCheckerScheduleFuture.cancel(false);
+ if (pushCheckerScheduleFuture != null) {
+ pushCheckerScheduleFuture.cancel(false);
+ }
+ if (fetchCheckerScheduleFuture != null) {
+ fetchCheckerScheduleFuture.cancel(false);
+ }
}
@Override
@@ -265,8 +287,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
remoteAddress);
failOutstandingRequests(cause);
}
- pushCheckerScheduleFuture.cancel(false);
- fetchCheckerScheduleFuture.cancel(false);
+ if (pushCheckerScheduleFuture != null) {
+ pushCheckerScheduleFuture.cancel(false);
+ }
+ if (fetchCheckerScheduleFuture != null) {
+ fetchCheckerScheduleFuture.cancel(false);
+ }
}
@Override