You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 04:52:03 UTC

[iotdb] branch xingtanzjr/query_execution updated: add test for statemachine

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new ab800c4d73 add test for statemachine
ab800c4d73 is described below

commit ab800c4d733bc1191d38cc71d416603d31b96bce
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 12:51:52 2022 +0800

    add test for statemachine
---
 .../iotdb/db/mpp/execution/QueryExecution.java     |  12 ++-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |   4 +
 .../iotdb/db/mpp/execution/StateMachine.java       |   2 +-
 .../db/mpp/execution/QueryStateMachineTest.java    | 116 +++++++++++++++++++++
 4 files changed, 131 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index c88512771f..7d49fe8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -160,12 +161,19 @@ public class QueryExecution {
   public ExecutionResult getStatus() {
     // Although we monitor the state to transition to RUNNING, the future will return if any
     // Terminated state is triggered
-    ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
+    SettableFuture<QueryState> future = SettableFuture.create();
+    stateMachine.addStateChangeListener(state -> {
+      if (state == QueryState.RUNNING || state.isDone()) {
+        future.set(state);
+      }
+    });
+
     try {
       QueryState state = future.get();
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
       TSStatusCode statusCode =
-          state == QueryState.FINISHED
+          // For WRITE, the state should be FINISHED; For READ, the state could be RUNNING
+          state == QueryState.FINISHED || state == QueryState.RUNNING
               ? TSStatusCode.SUCCESS_STATUS
               : TSStatusCode.QUERY_PROCESS_ERROR;
       return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 66e7cbbc4d..27134951fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -83,6 +83,10 @@ public class QueryStateMachine {
     return name;
   }
 
+  public QueryState getState() {
+    return queryState.get();
+  }
+
   public void transitionToPlanned() {
     queryState.set(QueryState.PLANNED);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index 39e663fda5..54592584ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -266,7 +266,7 @@ public class StateMachine<T> {
    * Listener is always notified asynchronously using a dedicated notification thread pool so, care
    * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
    * Additionally, it is possible notifications are observed out of order due to the asynchronous
-   * execution. The listener is immediately notified immediately of the current state.
+   * execution. The listener is notified immediately of the current state.
    */
   public void addStateChangeListener(StateChangeListener<T> stateChangeListener) {
     requireNonNull(stateChangeListener, "stateChangeListener is null");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
new file mode 100644
index 0000000000..dde44f0d64
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueryStateMachineTest {
+
+  @Test
+  public void TestBasicTransition() {
+    QueryStateMachine stateMachine = genQueryStateMachine();
+    Assert.assertEquals(stateMachine.getState(), QueryState.QUEUED);
+    stateMachine.transitionToDispatching();
+    Assert.assertEquals(stateMachine.getState(), QueryState.DISPATCHING);
+    stateMachine.transitionToRunning();
+    Assert.assertEquals(stateMachine.getState(), QueryState.RUNNING);
+    stateMachine.transitionToAborted();
+    Assert.assertEquals(stateMachine.getState(), QueryState.ABORTED);
+
+    // StateMachine with Terminal State is not allowed to transition state
+    stateMachine = genQueryStateMachine();
+    stateMachine.transitionToCanceled();
+    Assert.assertEquals(stateMachine.getState(), QueryState.CANCELED);
+
+    stateMachine = genQueryStateMachine();
+    stateMachine.transitionToFinished();
+    Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
+  }
+
+  @Test
+  public void TestFragmentInstanceToFinished() {
+    List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
+    QueryStateMachine stateMachine = genQueryStateMachine();
+    for(FragmentInstanceId id : instanceIds) {
+      stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
+    }
+    for(FragmentInstanceId id : instanceIds) {
+      stateMachine.updateFragInstanceState(id, FragmentInstanceState.FINISHED);
+    }
+    Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
+  }
+
+  @Test
+  public void TestFragmentInstanceToTerminalState() {
+    List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
+    QueryStateMachine stateMachine = genQueryStateMachine();
+    for(FragmentInstanceId id : instanceIds) {
+      stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
+    }
+    stateMachine.updateFragInstanceState(instanceIds.get(0), FragmentInstanceState.FAILED);
+    Assert.assertEquals(stateMachine.getState(), QueryState.FAILED);
+  }
+
+  @Test
+  public void TestListener() throws ExecutionException, InterruptedException {
+    AtomicInteger stateChangeCounter = new AtomicInteger(0);
+    QueryStateMachine stateMachine = genQueryStateMachine();
+    stateMachine.addStateChangeListener(state -> {
+      stateChangeCounter.getAndIncrement();
+    });
+    stateMachine.transitionToFinished();
+    SettableFuture<QueryState> future = SettableFuture.create();
+    stateMachine.addStateChangeListener(state -> {
+      if (state == QueryState.FINISHED) {
+        future.set(QueryState.FINISHED);
+      }
+    });
+    future.get();
+    Assert.assertEquals(stateChangeCounter.get(), 2);
+  }
+
+  private QueryStateMachine genQueryStateMachine() {
+    return new QueryStateMachine(genQueryId(), IoTDBThreadPoolFactory.newSingleThreadExecutor("TestQueryStateMachine"));
+  }
+
+  private List<FragmentInstanceId> genFragmentInstanceIdList() {
+    return Arrays.asList(
+        new FragmentInstanceId(new PlanFragmentId(genQueryId(), 1), "1"),
+        new FragmentInstanceId(new PlanFragmentId(genQueryId(), 2), "1"),
+        new FragmentInstanceId(new PlanFragmentId(genQueryId(), 3), "1"),
+        new FragmentInstanceId(new PlanFragmentId(genQueryId(), 4), "1"),
+        new FragmentInstanceId(new PlanFragmentId(genQueryId(), 4), "2"));
+  }
+
+  private QueryId genQueryId() {
+    return new QueryId("test_query");
+  }
+}