You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/25 00:12:38 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix concurrent state change bug in QueryStateMachine
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new fb1d426630 [To rel/1.1] Fix concurrent state change bug in QueryStateMachine
fb1d426630 is described below
commit fb1d42663065d46a10a895ccc6c1959400692162
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Apr 25 08:12:32 2023 +0800
[To rel/1.1] Fix concurrent state change bug in QueryStateMachine
---
.../iotdb/db/mpp/execution/QueryStateMachine.java | 78 ++++++++++------------
.../execution/exchange/source/SourceHandle.java | 31 ++++++++-
.../db/mpp/execution/QueryStateMachineTest.java | 2 +
3 files changed, 66 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 53a52bd940..ffc174cbce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -27,12 +27,23 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.QueryState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.CANCELED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.DISPATCHING;
+import static org.apache.iotdb.db.mpp.execution.QueryState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.PENDING_RETRY;
+import static org.apache.iotdb.db.mpp.execution.QueryState.PLANNED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.QUEUED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.RUNNING;
+
/**
* State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
* register listeners when the state changes of the QueryExecution.
*/
public class QueryStateMachine {
- private final String name;
private final StateMachine<QueryState> queryState;
// The executor will be used in all the state machines belonged to this query.
@@ -41,13 +52,12 @@ public class QueryStateMachine {
private TSStatus failureStatus;
public QueryStateMachine(QueryId queryId, ExecutorService executor) {
- this.name = String.format("QueryStateMachine[%s]", queryId);
this.stateMachineExecutor = executor;
this.queryState =
new StateMachine<>(
queryId.toString(),
this.stateMachineExecutor,
- QueryState.QUEUED,
+ QUEUED,
QueryState.TERMINAL_INSTANCE_STATES);
}
@@ -60,89 +70,71 @@ public class QueryStateMachine {
return queryState.getStateChange(currentState);
}
- private String getName() {
- return name;
- }
-
public QueryState getState() {
return queryState.get();
}
public void transitionToQueued() {
- queryState.set(QueryState.QUEUED);
+ queryState.set(QUEUED);
}
public void transitionToPlanned() {
- queryState.set(QueryState.PLANNED);
+ queryState.setIf(PLANNED, currentState -> currentState == QUEUED);
}
public void transitionToDispatching() {
- queryState.set(QueryState.DISPATCHING);
+ queryState.setIf(DISPATCHING, currentState -> currentState == PLANNED);
}
public void transitionToPendingRetry(TSStatus failureStatus) {
- if (queryState.get().isDone()) {
- return;
- }
this.failureStatus = failureStatus;
- queryState.set(QueryState.PENDING_RETRY);
+ queryState.setIf(PENDING_RETRY, currentState -> currentState == DISPATCHING);
}
public void transitionToRunning() {
- queryState.set(QueryState.RUNNING);
+ // if we can skipExecute in QueryExecution.start(), we will directly change from QUEUED to
+ // RUNNING
+ queryState.setIf(
+ RUNNING, currentState -> currentState == DISPATCHING || currentState == QUEUED);
}
public void transitionToFinished() {
- if (queryState.get().isDone()) {
- return;
- }
- queryState.set(QueryState.FINISHED);
+ transitionToDoneState(FINISHED);
}
public void transitionToCanceled() {
- if (queryState.get().isDone()) {
- return;
- }
- queryState.set(QueryState.CANCELED);
+ transitionToDoneState(CANCELED);
}
public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
- if (queryState.get().isDone()) {
- return;
- }
this.failureException = throwable;
this.failureStatus = failureStatus;
- queryState.set(QueryState.CANCELED);
+ transitionToDoneState(CANCELED);
}
public void transitionToAborted() {
- if (queryState.get().isDone()) {
- return;
- }
- queryState.set(QueryState.ABORTED);
+ transitionToDoneState(ABORTED);
}
public void transitionToFailed() {
- if (queryState.get().isDone()) {
- return;
- }
- queryState.set(QueryState.FAILED);
+ transitionToDoneState(FAILED);
}
public void transitionToFailed(Throwable throwable) {
- if (queryState.get().isDone()) {
- return;
- }
this.failureException = throwable;
- queryState.set(QueryState.FAILED);
+ transitionToDoneState(FAILED);
}
public void transitionToFailed(TSStatus failureStatus) {
- if (queryState.get().isDone()) {
- return;
- }
this.failureStatus = failureStatus;
- queryState.set(QueryState.FAILED);
+ transitionToDoneState(FAILED);
+ }
+
+ private void transitionToDoneState(QueryState doneState) {
+ requireNonNull(doneState, "doneState is null");
+ checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+ queryState.setIf(doneState, currentState -> !currentState.isDone());
}
public String getFailureMessage() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index b66dd4bce4..acc52ceae5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -342,8 +342,35 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public void abort(Throwable t) {
- abort();
+ public synchronized void abort(Throwable t) {
+ try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+ if (aborted || closed) {
+ return;
+ }
+ if (blocked != null && !blocked.isDone()) {
+ blocked.setException(t);
+ }
+ if (blockedOnMemory != null) {
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+ }
+ sequenceIdToDataBlockSize.clear();
+ if (bufferRetainedSizeInBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
+ }
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
+ aborted = true;
+ sourceHandleListener.onAborted(this);
+ }
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
index 339a79b45b..11c2dae5b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -42,6 +42,8 @@ public class QueryStateMachineTest {
public void TestBasicTransition() {
QueryStateMachine stateMachine = genQueryStateMachine();
Assert.assertEquals(stateMachine.getState(), QueryState.QUEUED);
+ stateMachine.transitionToPlanned();
+ Assert.assertEquals(stateMachine.getState(), QueryState.PLANNED);
stateMachine.transitionToDispatching();
Assert.assertEquals(stateMachine.getState(), QueryState.DISPATCHING);
stateMachine.transitionToRunning();