You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/17 09:53:10 UTC

[iotdb] branch master updated: Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb6bd6da90 Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)
bdb6bd6da90 is described below

commit bdb6bd6da90c317dc40da8f43e2eeca4d2a75af8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Sep 17 17:53:05 2023 +0800

    Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)
---
 .../client/async/AsyncDataNodeClientPool.java      | 45 ++++++++++++++++++----
 .../manager/load/balancer/RouteBalancer.java       |  2 +-
 .../pipe/runtime/PipeHeartbeatScheduler.java       |  8 +++-
 .../procedure/env/ConfigNodeProcedureEnv.java      | 23 ++++++-----
 4 files changed, 59 insertions(+), 19 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 7e8e52e6ff7..139a340079a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -76,6 +76,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /** Asynchronously send RPC requests to DataNodes. See queryengine.thrift for more details. */
 public class AsyncDataNodeClientPool {
 
@@ -92,6 +94,19 @@ public class AsyncDataNodeClientPool {
                 new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
   }
 
+  /**
+   * Send asynchronous requests to the specified DataNodes with default retry num
+   *
+   * <p>Notice: The DataNodes that failed to receive the requests will be reconnected
+   *
+   * @param clientHandler <RequestType, ResponseType> which will also contain the result
+   * @param timeoutInMs timeout in milliseconds
+   */
+  public void sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+      AsyncClientHandler<?, ?> clientHandler, long timeoutInMs) {
+    sendAsyncRequest(clientHandler, MAX_RETRY_NUM, timeoutInMs);
+  }
+
   /**
    * Send asynchronous requests to the specified DataNodes with default retry num
    *
@@ -100,15 +115,15 @@ public class AsyncDataNodeClientPool {
    * @param clientHandler <RequestType, ResponseType> which will also contain the result
    */
   public void sendAsyncRequestToDataNodeWithRetry(AsyncClientHandler<?, ?> clientHandler) {
-    sendAsyncRequest(clientHandler, MAX_RETRY_NUM);
+    sendAsyncRequest(clientHandler, MAX_RETRY_NUM, null);
   }
 
-  public void sendAsyncRequestToDataNodeWithRetry(
-      AsyncClientHandler<?, ?> clientHandler, int retryNum) {
-    sendAsyncRequest(clientHandler, retryNum);
+  public void sendAsyncRequestToDataNode(AsyncClientHandler<?, ?> clientHandler) {
+    sendAsyncRequest(clientHandler, 1, null);
   }
 
-  private void sendAsyncRequest(AsyncClientHandler<?, ?> clientHandler, int retryNum) {
+  private void sendAsyncRequest(
+      AsyncClientHandler<?, ?> clientHandler, int retryNum, Long timeoutInMs) {
     if (clientHandler.getRequestIndices().isEmpty()) {
       return;
     }
@@ -126,9 +141,17 @@ public class AsyncDataNodeClientPool {
 
       // Wait for this batch of asynchronous RPC requests finish
       try {
-        clientHandler.getCountDownLatch().await();
+        if (timeoutInMs == null) {
+          clientHandler.getCountDownLatch().await();
+        } else {
+          if (!clientHandler.getCountDownLatch().await(timeoutInMs, TimeUnit.MILLISECONDS)) {
+            LOGGER.warn(
+                "Timeout during {} on ConfigNode. Retry: {}/{}", requestType, retry, retryNum);
+          }
+        }
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupted during {} on ConfigNode", requestType);
+        LOGGER.error(
+            "Interrupted during {} on ConfigNode. Retry: {}/{}", requestType, retry, retryNum);
         Thread.currentThread().interrupt();
       }
 
@@ -137,6 +160,14 @@ public class AsyncDataNodeClientPool {
         return;
       }
     }
+
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      LOGGER.warn(
+          "Failed to {} on ConfigNode after {} retries, requestIndices: {}",
+          requestType,
+          retryNum,
+          clientHandler.getRequestIndices());
+    }
   }
 
   private void sendAsyncRequestToDataNode(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 6380045be5b..f8f1c92484f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -183,7 +183,7 @@ public class RouteBalancer {
         });
     if (requestId.get() > 0) {
       // Don't retry ChangeLeader request
-      AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler, 1);
+      AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler);
     }
     return differentRegionLeaderMap;
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
index 9b832b351dd..04ecb0fcf0c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
@@ -89,7 +89,13 @@ public class PipeHeartbeatScheduler {
 
     final AsyncClientHandler<TPipeHeartbeatReq, TPipeHeartbeatResp> clientHandler =
         new AsyncClientHandler<>(DataNodeRequestType.PIPE_HEARTBEAT, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
+                * 1000L
+                * 2
+                / 3);
     clientHandler
         .getResponseMap()
         .forEach(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 7111595c927..b9c9aaa8d3e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
@@ -97,9 +98,6 @@ public class ConfigNodeProcedureEnv {
   /** Add or remove node lock. */
   private final LockQueue nodeLock = new LockQueue();
 
-  /** Pipe operation lock. */
-  private final LockQueue pipeLock = new LockQueue();
-
   private final ReentrantLock schedulerLock = new ReentrantLock(true);
 
   private final ConfigManager configManager;
@@ -680,7 +678,10 @@ public class ConfigNodeProcedureEnv {
     final AsyncClientHandler<TPushPipeMetaReq, TPushPipeMetaResp> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.PIPE_PUSH_ALL_META, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3);
     return clientHandler.getResponseMap();
   }
 
@@ -692,7 +693,10 @@ public class ConfigNodeProcedureEnv {
     final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2);
     return clientHandler.getResponseMap();
   }
 
@@ -705,7 +709,10 @@ public class ConfigNodeProcedureEnv {
     final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2);
     return clientHandler.getResponseMap();
   }
 
@@ -713,10 +720,6 @@ public class ConfigNodeProcedureEnv {
     return nodeLock;
   }
 
-  public LockQueue getPipeLock() {
-    return pipeLock;
-  }
-
   public ProcedureScheduler getScheduler() {
     return scheduler;
   }