You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/17 09:53:10 UTC
[iotdb] branch master updated: Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bdb6bd6da90 Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)
bdb6bd6da90 is described below
commit bdb6bd6da90c317dc40da8f43e2eeca4d2a75af8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Sep 17 17:53:05 2023 +0800
Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)
---
.../client/async/AsyncDataNodeClientPool.java | 45 ++++++++++++++++++----
.../manager/load/balancer/RouteBalancer.java | 2 +-
.../pipe/runtime/PipeHeartbeatScheduler.java | 8 +++-
.../procedure/env/ConfigNodeProcedureEnv.java | 23 ++++++-----
4 files changed, 59 insertions(+), 19 deletions(-)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 7e8e52e6ff7..139a340079a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -76,6 +76,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
/** Asynchronously send RPC requests to DataNodes. See queryengine.thrift for more details. */
public class AsyncDataNodeClientPool {
@@ -92,6 +94,19 @@ public class AsyncDataNodeClientPool {
new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
}
+ /**
+ * Send asynchronous requests to the specified DataNodes with default retry num
+ *
+ * <p>Notice: The DataNodes that failed to receive the requests will be reconnected
+ *
+ * @param clientHandler <RequestType, ResponseType> which will also contain the result
+ * @param timeoutInMs timeout in milliseconds
+ */
+ public void sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+ AsyncClientHandler<?, ?> clientHandler, long timeoutInMs) {
+ sendAsyncRequest(clientHandler, MAX_RETRY_NUM, timeoutInMs);
+ }
+
/**
* Send asynchronous requests to the specified DataNodes with default retry num
*
@@ -100,15 +115,15 @@ public class AsyncDataNodeClientPool {
* @param clientHandler <RequestType, ResponseType> which will also contain the result
*/
public void sendAsyncRequestToDataNodeWithRetry(AsyncClientHandler<?, ?> clientHandler) {
- sendAsyncRequest(clientHandler, MAX_RETRY_NUM);
+ sendAsyncRequest(clientHandler, MAX_RETRY_NUM, null);
}
- public void sendAsyncRequestToDataNodeWithRetry(
- AsyncClientHandler<?, ?> clientHandler, int retryNum) {
- sendAsyncRequest(clientHandler, retryNum);
+ public void sendAsyncRequestToDataNode(AsyncClientHandler<?, ?> clientHandler) {
+ sendAsyncRequest(clientHandler, 1, null);
}
- private void sendAsyncRequest(AsyncClientHandler<?, ?> clientHandler, int retryNum) {
+ private void sendAsyncRequest(
+ AsyncClientHandler<?, ?> clientHandler, int retryNum, Long timeoutInMs) {
if (clientHandler.getRequestIndices().isEmpty()) {
return;
}
@@ -126,9 +141,17 @@ public class AsyncDataNodeClientPool {
// Wait for this batch of asynchronous RPC requests finish
try {
- clientHandler.getCountDownLatch().await();
+ if (timeoutInMs == null) {
+ clientHandler.getCountDownLatch().await();
+ } else {
+ if (!clientHandler.getCountDownLatch().await(timeoutInMs, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn(
+ "Timeout during {} on ConfigNode. Retry: {}/{}", requestType, retry, retryNum);
+ }
+ }
} catch (InterruptedException e) {
- LOGGER.error("Interrupted during {} on ConfigNode", requestType);
+ LOGGER.error(
+ "Interrupted during {} on ConfigNode. Retry: {}/{}", requestType, retry, retryNum);
Thread.currentThread().interrupt();
}
@@ -137,6 +160,14 @@ public class AsyncDataNodeClientPool {
return;
}
}
+
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ LOGGER.warn(
+ "Failed to {} on ConfigNode after {} retries, requestIndices: {}",
+ requestType,
+ retryNum,
+ clientHandler.getRequestIndices());
+ }
}
private void sendAsyncRequestToDataNode(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 6380045be5b..f8f1c92484f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -183,7 +183,7 @@ public class RouteBalancer {
});
if (requestId.get() > 0) {
// Don't retry ChangeLeader request
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler, 1);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler);
}
return differentRegionLeaderMap;
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
index 9b832b351dd..04ecb0fcf0c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
@@ -89,7 +89,13 @@ public class PipeHeartbeatScheduler {
final AsyncClientHandler<TPipeHeartbeatReq, TPipeHeartbeatResp> clientHandler =
new AsyncClientHandler<>(DataNodeRequestType.PIPE_HEARTBEAT, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+ clientHandler,
+ PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
+ * 1000L
+ * 2
+ / 3);
clientHandler
.getResponseMap()
.forEach(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 7111595c927..b9c9aaa8d3e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
@@ -97,9 +98,6 @@ public class ConfigNodeProcedureEnv {
/** Add or remove node lock. */
private final LockQueue nodeLock = new LockQueue();
- /** Pipe operation lock. */
- private final LockQueue pipeLock = new LockQueue();
-
private final ReentrantLock schedulerLock = new ReentrantLock(true);
private final ConfigManager configManager;
@@ -680,7 +678,10 @@ public class ConfigNodeProcedureEnv {
final AsyncClientHandler<TPushPipeMetaReq, TPushPipeMetaResp> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.PIPE_PUSH_ALL_META, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+ clientHandler,
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3);
return clientHandler.getResponseMap();
}
@@ -692,7 +693,10 @@ public class ConfigNodeProcedureEnv {
final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+ clientHandler,
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2);
return clientHandler.getResponseMap();
}
@@ -705,7 +709,10 @@ public class ConfigNodeProcedureEnv {
final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ AsyncDataNodeClientPool.getInstance()
+ .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+ clientHandler,
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2);
return clientHandler.getResponseMap();
}
@@ -713,10 +720,6 @@ public class ConfigNodeProcedureEnv {
return nodeLock;
}
- public LockQueue getPipeLock() {
- return pipeLock;
- }
-
public ProcedureScheduler getScheduler() {
return scheduler;
}