You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/20 10:48:05 UTC
[iotdb] 01/01: change the condition to triger retry in QueryExecution
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_retry_condition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8069b4a54c7bcf60a4eafb1dbaca03f6182abc2a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Jul 20 18:47:47 2022 +0800
change the condition to triger retry in QueryExecution
---
.../iotdb/db/mpp/execution/QueryStateMachine.java | 4 ++--
.../iotdb/db/mpp/plan/scheduler/ClusterScheduler.java | 17 +++++++++++------
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 42e793b41f..44f16e2ad9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -100,12 +100,12 @@ public class QueryStateMachine {
queryState.set(QueryState.DISPATCHING);
}
- public void transitionToRetrying(Throwable throwable) {
+ public void transitionToRetrying(TSStatus failureStatus) {
if (queryState.get().isDone()) {
return;
}
+ this.failureStatus = failureStatus;
queryState.set(QueryState.RETRYING);
- this.failureException = throwable;
}
public void transitionToRunning() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index bd0e166c36..f2d36f093d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.units.Duration;
import org.slf4j.Logger;
@@ -89,6 +91,11 @@ public class ClusterScheduler implements IScheduler {
}
}
+ private boolean needRetry(TSStatus failureStatus) {
+ return failureStatus != null
+ && failureStatus.getCode() == TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode();
+ }
+
@Override
public void start() {
stateMachine.transitionToDispatching();
@@ -99,12 +106,10 @@ public class ClusterScheduler implements IScheduler {
try {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
- if (result.getFailureStatus() != null) {
- stateMachine.transitionToFailed(result.getFailureStatus());
+ if (needRetry(result.getFailureStatus())) {
+ stateMachine.transitionToRetrying(result.getFailureStatus());
} else {
- // won't get into here
- stateMachine.transitionToFailed(
- new IllegalStateException("Fragment cannot be dispatched"));
+ stateMachine.transitionToFailed(result.getFailureStatus());
}
return;
}
@@ -113,7 +118,7 @@ public class ClusterScheduler implements IScheduler {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- stateMachine.transitionToRetrying(e);
+ stateMachine.transitionToFailed(e);
return;
}