You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/25 00:12:38 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix concurrent state change bug in QueryStateMachine

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new fb1d426630 [To rel/1.1] Fix concurrent state change bug in QueryStateMachine
fb1d426630 is described below

commit fb1d42663065d46a10a895ccc6c1959400692162
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Apr 25 08:12:32 2023 +0800

    [To rel/1.1] Fix concurrent state change bug in QueryStateMachine
---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 78 ++++++++++------------
 .../execution/exchange/source/SourceHandle.java    | 31 ++++++++-
 .../db/mpp/execution/QueryStateMachineTest.java    |  2 +
 3 files changed, 66 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 53a52bd940..ffc174cbce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -27,12 +27,23 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.QueryState.ABORTED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.CANCELED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.DISPATCHING;
+import static org.apache.iotdb.db.mpp.execution.QueryState.FAILED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.FINISHED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.PENDING_RETRY;
+import static org.apache.iotdb.db.mpp.execution.QueryState.PLANNED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.QUEUED;
+import static org.apache.iotdb.db.mpp.execution.QueryState.RUNNING;
+
 /**
  * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
  * register listeners when the state changes of the QueryExecution.
  */
 public class QueryStateMachine {
-  private final String name;
   private final StateMachine<QueryState> queryState;
 
   // The executor will be used in all the state machines belonged to this query.
@@ -41,13 +52,12 @@ public class QueryStateMachine {
   private TSStatus failureStatus;
 
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
-    this.name = String.format("QueryStateMachine[%s]", queryId);
     this.stateMachineExecutor = executor;
     this.queryState =
         new StateMachine<>(
             queryId.toString(),
             this.stateMachineExecutor,
-            QueryState.QUEUED,
+            QUEUED,
             QueryState.TERMINAL_INSTANCE_STATES);
   }
 
@@ -60,89 +70,71 @@ public class QueryStateMachine {
     return queryState.getStateChange(currentState);
   }
 
-  private String getName() {
-    return name;
-  }
-
   public QueryState getState() {
     return queryState.get();
   }
 
   public void transitionToQueued() {
-    queryState.set(QueryState.QUEUED);
+    queryState.set(QUEUED);
   }
 
   public void transitionToPlanned() {
-    queryState.set(QueryState.PLANNED);
+    queryState.setIf(PLANNED, currentState -> currentState == QUEUED);
   }
 
   public void transitionToDispatching() {
-    queryState.set(QueryState.DISPATCHING);
+    queryState.setIf(DISPATCHING, currentState -> currentState == PLANNED);
   }
 
   public void transitionToPendingRetry(TSStatus failureStatus) {
-    if (queryState.get().isDone()) {
-      return;
-    }
     this.failureStatus = failureStatus;
-    queryState.set(QueryState.PENDING_RETRY);
+    queryState.setIf(PENDING_RETRY, currentState -> currentState == DISPATCHING);
   }
 
   public void transitionToRunning() {
-    queryState.set(QueryState.RUNNING);
+    // if we can skipExecute in QueryExecution.start(), we will directly change from QUEUED to
+    // RUNNING
+    queryState.setIf(
+        RUNNING, currentState -> currentState == DISPATCHING || currentState == QUEUED);
   }
 
   public void transitionToFinished() {
-    if (queryState.get().isDone()) {
-      return;
-    }
-    queryState.set(QueryState.FINISHED);
+    transitionToDoneState(FINISHED);
   }
 
   public void transitionToCanceled() {
-    if (queryState.get().isDone()) {
-      return;
-    }
-    queryState.set(QueryState.CANCELED);
+    transitionToDoneState(CANCELED);
   }
 
   public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
-    if (queryState.get().isDone()) {
-      return;
-    }
     this.failureException = throwable;
     this.failureStatus = failureStatus;
-    queryState.set(QueryState.CANCELED);
+    transitionToDoneState(CANCELED);
   }
 
   public void transitionToAborted() {
-    if (queryState.get().isDone()) {
-      return;
-    }
-    queryState.set(QueryState.ABORTED);
+    transitionToDoneState(ABORTED);
   }
 
   public void transitionToFailed() {
-    if (queryState.get().isDone()) {
-      return;
-    }
-    queryState.set(QueryState.FAILED);
+    transitionToDoneState(FAILED);
   }
 
   public void transitionToFailed(Throwable throwable) {
-    if (queryState.get().isDone()) {
-      return;
-    }
     this.failureException = throwable;
-    queryState.set(QueryState.FAILED);
+    transitionToDoneState(FAILED);
   }
 
   public void transitionToFailed(TSStatus failureStatus) {
-    if (queryState.get().isDone()) {
-      return;
-    }
     this.failureStatus = failureStatus;
-    queryState.set(QueryState.FAILED);
+    transitionToDoneState(FAILED);
+  }
+
+  private void transitionToDoneState(QueryState doneState) {
+    requireNonNull(doneState, "doneState is null");
+    checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
+
+    queryState.setIf(doneState, currentState -> !currentState.isDone());
   }
 
   public String getFailureMessage() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index b66dd4bce4..acc52ceae5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -342,8 +342,35 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public void abort(Throwable t) {
-    abort();
+  public synchronized void abort(Throwable t) {
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      if (aborted || closed) {
+        return;
+      }
+      if (blocked != null && !blocked.isDone()) {
+        blocked.setException(t);
+      }
+      if (blockedOnMemory != null) {
+        bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+      }
+      sequenceIdToDataBlockSize.clear();
+      if (bufferRetainedSizeInBytes > 0) {
+        localMemoryManager
+            .getQueryPool()
+            .free(
+                localFragmentInstanceId.getQueryId(),
+                fullFragmentInstanceId,
+                localPlanNodeId,
+                bufferRetainedSizeInBytes);
+        bufferRetainedSizeInBytes = 0;
+      }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
+      aborted = true;
+      sourceHandleListener.onAborted(this);
+    }
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
index 339a79b45b..11c2dae5b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -42,6 +42,8 @@ public class QueryStateMachineTest {
   public void TestBasicTransition() {
     QueryStateMachine stateMachine = genQueryStateMachine();
     Assert.assertEquals(stateMachine.getState(), QueryState.QUEUED);
+    stateMachine.transitionToPlanned();
+    Assert.assertEquals(stateMachine.getState(), QueryState.PLANNED);
     stateMachine.transitionToDispatching();
     Assert.assertEquals(stateMachine.getState(), QueryState.DISPATCHING);
     stateMachine.transitionToRunning();