You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/12 06:09:56 UTC
[iotdb] 01/01: leverage client RPC to do the retry logic rather than threadPool
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_retry_deadlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bffa8d12dc43feb0471c5b707c730149bcb2edbc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Aug 12 14:09:41 2022 +0800
leverage client RPC to do the retry logic rather than threadPool
---
.../apache/iotdb/db/mpp/execution/QueryState.java | 2 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 8 ++++++--
.../db/mpp/plan/execution/QueryExecution.java | 23 ++++++++++++++--------
.../db/mpp/plan/scheduler/ClusterScheduler.java | 2 +-
4 files changed, 23 insertions(+), 12 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index d4ed15d9ea..8cd69763c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -28,7 +28,7 @@ public enum QueryState {
QUEUED(false),
PLANNED(false),
DISPATCHING(false),
- RETRYING(false),
+ PENDING_RETRY(false),
RUNNING(false),
FINISHED(true),
CANCELED(true),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 44f16e2ad9..703255afe7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -92,6 +92,10 @@ public class QueryStateMachine {
return queryState.get();
}
+ public void transitionToQueued() {
+ queryState.set(QueryState.QUEUED);
+ }
+
public void transitionToPlanned() {
queryState.set(QueryState.PLANNED);
}
@@ -100,12 +104,12 @@ public class QueryStateMachine {
queryState.set(QueryState.DISPATCHING);
}
- public void transitionToRetrying(TSStatus failureStatus) {
+ public void transitionToPendingRetry(TSStatus failureStatus) {
if (queryState.get().isDone()) {
return;
}
this.failureStatus = failureStatus;
- queryState.set(QueryState.RETRYING);
+ queryState.set(QueryState.PENDING_RETRY);
}
public void transitionToRunning() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 842352a7b6..4a6ac5fdc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -144,10 +144,6 @@ public class QueryExecution implements IQueryExecution {
stateMachine.addStateChangeListener(
state -> {
try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) {
- if (state == QueryState.RETRYING) {
- retry();
- return;
- }
if (!state.isDone()) {
return;
}
@@ -179,17 +175,18 @@ public class QueryExecution implements IQueryExecution {
doLogicalPlan();
doDistributedPlan();
+ stateMachine.transitionToPlanned();
if (context.getQueryType() == QueryType.READ) {
initResultHandle();
}
schedule();
}
- private void retry() {
+ private ExecutionResult retry() {
if (retryCount >= MAX_RETRY_COUNT) {
logger.error("reach max retry count. transit query to failed");
stateMachine.transitionToFailed();
- return;
+ return getStatus();
}
logger.warn("error when executing query. {}", stateMachine.getFailureMessage());
// stop and clean up resources the QueryExecution used
@@ -203,13 +200,14 @@ public class QueryExecution implements IQueryExecution {
}
retryCount++;
logger.info("start to retry. Retry count is: {}", retryCount);
-
+ stateMachine.transitionToQueued();
// force invalid PartitionCache
partitionFetcher.invalidAllCache();
// re-analyze the query
this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
// re-start the QueryExecution
this.start();
+ return getStatus();
}
private boolean skipExecute() {
@@ -407,11 +405,20 @@ public class QueryExecution implements IQueryExecution {
SettableFuture<QueryState> future = SettableFuture.create();
stateMachine.addStateChangeListener(
state -> {
- if (state == QueryState.RUNNING || state.isDone()) {
+ if (state == QueryState.RUNNING
+ || state.isDone()
+ || state == QueryState.PENDING_RETRY) {
future.set(state);
}
});
QueryState state = future.get();
+ if (state == QueryState.PENDING_RETRY) {
+ // That we put retry() here is aimed to leverage the ClientRPC thread rather than
+ // create another new thread to do the retry() logic.
+ // This way will lead to recursive call because retry() calls getStatus() inside.
+ // The max depths of recursive call is equal to the max retry count.
+ return retry();
+ }
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
return getExecutionResult(state);
} catch (InterruptedException | ExecutionException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index e9829cc637..417b6b5e18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -106,7 +106,7 @@ public class ClusterScheduler implements IScheduler {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
if (needRetry(result.getFailureStatus())) {
- stateMachine.transitionToRetrying(result.getFailureStatus());
+ stateMachine.transitionToPendingRetry(result.getFailureStatus());
} else {
stateMachine.transitionToFailed(result.getFailureStatus());
}