You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/17 10:03:53 UTC

[iotdb] branch 1.2-fix-pipe-lock-req created (now 58fdd3b1f50)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch 1.2-fix-pipe-lock-req
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 58fdd3b1f50 Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)

This branch includes the following new commits:

     new 58fdd3b1f50 Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch 1.2-fix-pipe-lock-req
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58fdd3b1f5055343e2052e8a1681b17d6c98a024
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Sep 17 17:53:05 2023 +0800

    Pipe: fix pipe procedure stuck because of data node async request forever waiting for response (#11157)
    
    (cherry picked from commit bdb6bd6da90c317dc40da8f43e2eeca4d2a75af8)
---
 .../client/async/AsyncDataNodeClientPool.java      | 45 ++++++++++++++++++----
 .../manager/load/balancer/RouteBalancer.java       |  2 +-
 .../pipe/runtime/PipeHeartbeatScheduler.java       |  8 +++-
 .../procedure/env/ConfigNodeProcedureEnv.java      | 23 ++++++-----
 4 files changed, 59 insertions(+), 19 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 7e8e52e6ff7..139a340079a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -76,6 +76,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /** Asynchronously send RPC requests to DataNodes. See queryengine.thrift for more details. */
 public class AsyncDataNodeClientPool {
 
@@ -92,6 +94,19 @@ public class AsyncDataNodeClientPool {
                 new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
   }
 
+  /**
+   * Send asynchronous requests to the specified DataNodes with default retry num
+   *
+   * <p>Notice: The DataNodes that failed to receive the requests will be reconnected
+   *
+   * @param clientHandler <RequestType, ResponseType> which will also contain the result
+   * @param timeoutInMs timeout in milliseconds
+   */
+  public void sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+      AsyncClientHandler<?, ?> clientHandler, long timeoutInMs) {
+    sendAsyncRequest(clientHandler, MAX_RETRY_NUM, timeoutInMs);
+  }
+
   /**
    * Send asynchronous requests to the specified DataNodes with default retry num
    *
@@ -100,15 +115,15 @@ public class AsyncDataNodeClientPool {
    * @param clientHandler <RequestType, ResponseType> which will also contain the result
    */
   public void sendAsyncRequestToDataNodeWithRetry(AsyncClientHandler<?, ?> clientHandler) {
-    sendAsyncRequest(clientHandler, MAX_RETRY_NUM);
+    sendAsyncRequest(clientHandler, MAX_RETRY_NUM, null);
   }
 
-  public void sendAsyncRequestToDataNodeWithRetry(
-      AsyncClientHandler<?, ?> clientHandler, int retryNum) {
-    sendAsyncRequest(clientHandler, retryNum);
+  public void sendAsyncRequestToDataNode(AsyncClientHandler<?, ?> clientHandler) {
+    sendAsyncRequest(clientHandler, 1, null);
   }
 
-  private void sendAsyncRequest(AsyncClientHandler<?, ?> clientHandler, int retryNum) {
+  private void sendAsyncRequest(
+      AsyncClientHandler<?, ?> clientHandler, int retryNum, Long timeoutInMs) {
     if (clientHandler.getRequestIndices().isEmpty()) {
       return;
     }
@@ -126,9 +141,17 @@ public class AsyncDataNodeClientPool {
 
       // Wait for this batch of asynchronous RPC requests finish
       try {
-        clientHandler.getCountDownLatch().await();
+        if (timeoutInMs == null) {
+          clientHandler.getCountDownLatch().await();
+        } else {
+          if (!clientHandler.getCountDownLatch().await(timeoutInMs, TimeUnit.MILLISECONDS)) {
+            LOGGER.warn(
+                "Timeout during {} on ConfigNode. Retry: {}/{}", requestType, retry, retryNum);
+          }
+        }
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupted during {} on ConfigNode", requestType);
+        LOGGER.error(
+            "Interrupted during {} on ConfigNode. Retry: {}/{}", requestType, retry, retryNum);
         Thread.currentThread().interrupt();
       }
 
@@ -137,6 +160,14 @@ public class AsyncDataNodeClientPool {
         return;
       }
     }
+
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      LOGGER.warn(
+          "Failed to {} on ConfigNode after {} retries, requestIndices: {}",
+          requestType,
+          retryNum,
+          clientHandler.getRequestIndices());
+    }
   }
 
   private void sendAsyncRequestToDataNode(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 6380045be5b..f8f1c92484f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -183,7 +183,7 @@ public class RouteBalancer {
         });
     if (requestId.get() > 0) {
       // Don't retry ChangeLeader request
-      AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler, 1);
+      AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler);
     }
     return differentRegionLeaderMap;
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
index 9b832b351dd..04ecb0fcf0c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
@@ -89,7 +89,13 @@ public class PipeHeartbeatScheduler {
 
     final AsyncClientHandler<TPipeHeartbeatReq, TPipeHeartbeatResp> clientHandler =
         new AsyncClientHandler<>(DataNodeRequestType.PIPE_HEARTBEAT, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
+                * 1000L
+                * 2
+                / 3);
     clientHandler
         .getResponseMap()
         .forEach(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 7111595c927..b9c9aaa8d3e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
@@ -97,9 +98,6 @@ public class ConfigNodeProcedureEnv {
   /** Add or remove node lock. */
   private final LockQueue nodeLock = new LockQueue();
 
-  /** Pipe operation lock. */
-  private final LockQueue pipeLock = new LockQueue();
-
   private final ReentrantLock schedulerLock = new ReentrantLock(true);
 
   private final ConfigManager configManager;
@@ -680,7 +678,10 @@ public class ConfigNodeProcedureEnv {
     final AsyncClientHandler<TPushPipeMetaReq, TPushPipeMetaResp> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.PIPE_PUSH_ALL_META, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3);
     return clientHandler.getResponseMap();
   }
 
@@ -692,7 +693,10 @@ public class ConfigNodeProcedureEnv {
     final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2);
     return clientHandler.getResponseMap();
   }
 
@@ -705,7 +709,10 @@ public class ConfigNodeProcedureEnv {
     final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    AsyncDataNodeClientPool.getInstance()
+        .sendAsyncRequestToDataNodeWithRetryAndTimeoutInMs(
+            clientHandler,
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2);
     return clientHandler.getResponseMap();
   }
 
@@ -713,10 +720,6 @@ public class ConfigNodeProcedureEnv {
     return nodeLock;
   }
 
-  public LockQueue getPipeLock() {
-    return pipeLock;
-  }
-
   public ProcedureScheduler getScheduler() {
     return scheduler;
   }