You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/12 06:09:56 UTC

[iotdb] 01/01: leverage client RPC to do the retry logic rather than threadPool

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_retry_deadlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bffa8d12dc43feb0471c5b707c730149bcb2edbc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Aug 12 14:09:41 2022 +0800

    leverage client RPC to do the retry logic rather than threadPool
---
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  8 ++++++--
 .../db/mpp/plan/execution/QueryExecution.java      | 23 ++++++++++++++--------
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  2 +-
 4 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index d4ed15d9ea..8cd69763c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -28,7 +28,7 @@ public enum QueryState {
   QUEUED(false),
   PLANNED(false),
   DISPATCHING(false),
-  RETRYING(false),
+  PENDING_RETRY(false),
   RUNNING(false),
   FINISHED(true),
   CANCELED(true),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 44f16e2ad9..703255afe7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -92,6 +92,10 @@ public class QueryStateMachine {
     return queryState.get();
   }
 
+  public void transitionToQueued() {
+    queryState.set(QueryState.QUEUED);
+  }
+
   public void transitionToPlanned() {
     queryState.set(QueryState.PLANNED);
   }
@@ -100,12 +104,12 @@ public class QueryStateMachine {
     queryState.set(QueryState.DISPATCHING);
   }
 
-  public void transitionToRetrying(TSStatus failureStatus) {
+  public void transitionToPendingRetry(TSStatus failureStatus) {
     if (queryState.get().isDone()) {
       return;
     }
     this.failureStatus = failureStatus;
-    queryState.set(QueryState.RETRYING);
+    queryState.set(QueryState.PENDING_RETRY);
   }
 
   public void transitionToRunning() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 842352a7b6..4a6ac5fdc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -144,10 +144,6 @@ public class QueryExecution implements IQueryExecution {
     stateMachine.addStateChangeListener(
         state -> {
           try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) {
-            if (state == QueryState.RETRYING) {
-              retry();
-              return;
-            }
             if (!state.isDone()) {
               return;
             }
@@ -179,17 +175,18 @@ public class QueryExecution implements IQueryExecution {
 
     doLogicalPlan();
     doDistributedPlan();
+    stateMachine.transitionToPlanned();
     if (context.getQueryType() == QueryType.READ) {
       initResultHandle();
     }
     schedule();
   }
 
-  private void retry() {
+  private ExecutionResult retry() {
     if (retryCount >= MAX_RETRY_COUNT) {
       logger.error("reach max retry count. transit query to failed");
       stateMachine.transitionToFailed();
-      return;
+      return getStatus();
     }
     logger.warn("error when executing query. {}", stateMachine.getFailureMessage());
     // stop and clean up resources the QueryExecution used
@@ -203,13 +200,14 @@ public class QueryExecution implements IQueryExecution {
     }
     retryCount++;
     logger.info("start to retry. Retry count is: {}", retryCount);
-
+    stateMachine.transitionToQueued();
     // force invalid PartitionCache
     partitionFetcher.invalidAllCache();
     // re-analyze the query
     this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
     // re-start the QueryExecution
     this.start();
+    return getStatus();
   }
 
   private boolean skipExecute() {
@@ -407,11 +405,20 @@ public class QueryExecution implements IQueryExecution {
       SettableFuture<QueryState> future = SettableFuture.create();
       stateMachine.addStateChangeListener(
           state -> {
-            if (state == QueryState.RUNNING || state.isDone()) {
+            if (state == QueryState.RUNNING
+                || state.isDone()
+                || state == QueryState.PENDING_RETRY) {
               future.set(state);
             }
           });
       QueryState state = future.get();
+      if (state == QueryState.PENDING_RETRY) {
+        // That we put retry() here is aimed to leverage the ClientRPC thread rather than
+        // create another new thread to do the retry() logic.
+        // This way will lead to recursive call because retry() calls getStatus() inside.
+        // The max depths of recursive call is equal to the max retry count.
+        return retry();
+      }
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
       return getExecutionResult(state);
     } catch (InterruptedException | ExecutionException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index e9829cc637..417b6b5e18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -106,7 +106,7 @@ public class ClusterScheduler implements IScheduler {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
         if (needRetry(result.getFailureStatus())) {
-          stateMachine.transitionToRetrying(result.getFailureStatus());
+          stateMachine.transitionToPendingRetry(result.getFailureStatus());
         } else {
           stateMachine.transitionToFailed(result.getFailureStatus());
         }