You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/20 10:48:05 UTC

[iotdb] 01/01: change the condition to triger retry in QueryExecution

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_retry_condition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8069b4a54c7bcf60a4eafb1dbaca03f6182abc2a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Jul 20 18:47:47 2022 +0800

    change the condition to triger retry in QueryExecution
---
 .../iotdb/db/mpp/execution/QueryStateMachine.java       |  4 ++--
 .../iotdb/db/mpp/plan/scheduler/ClusterScheduler.java   | 17 +++++++++++------
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 42e793b41f..44f16e2ad9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -100,12 +100,12 @@ public class QueryStateMachine {
     queryState.set(QueryState.DISPATCHING);
   }
 
-  public void transitionToRetrying(Throwable throwable) {
+  public void transitionToRetrying(TSStatus failureStatus) {
     if (queryState.get().isDone()) {
       return;
     }
+    this.failureStatus = failureStatus;
     queryState.set(QueryState.RETRYING);
-    this.failureException = throwable;
   }
 
   public void transitionToRunning() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index bd0e166c36..f2d36f093d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.units.Duration;
 import org.slf4j.Logger;
@@ -89,6 +91,11 @@ public class ClusterScheduler implements IScheduler {
     }
   }
 
+  private boolean needRetry(TSStatus failureStatus) {
+    return failureStatus != null
+        && failureStatus.getCode() == TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode();
+  }
+
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
@@ -99,12 +106,10 @@ public class ClusterScheduler implements IScheduler {
     try {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
-        if (result.getFailureStatus() != null) {
-          stateMachine.transitionToFailed(result.getFailureStatus());
+        if (needRetry(result.getFailureStatus())) {
+          stateMachine.transitionToRetrying(result.getFailureStatus());
         } else {
-          // won't get into here
-          stateMachine.transitionToFailed(
-              new IllegalStateException("Fragment cannot be dispatched"));
+          stateMachine.transitionToFailed(result.getFailureStatus());
         }
         return;
       }
@@ -113,7 +118,7 @@ public class ClusterScheduler implements IScheduler {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
       }
-      stateMachine.transitionToRetrying(e);
+      stateMachine.transitionToFailed(e);
       return;
     }