You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 04:52:03 UTC
[iotdb] branch xingtanzjr/query_execution updated: add test for statemachine
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new ab800c4d73 add test for statemachine
ab800c4d73 is described below
commit ab800c4d733bc1191d38cc71d416603d31b96bce
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 12:51:52 2022 +0800
add test for statemachine
---
.../iotdb/db/mpp/execution/QueryExecution.java | 12 ++-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 4 +
.../iotdb/db/mpp/execution/StateMachine.java | 2 +-
.../db/mpp/execution/QueryStateMachineTest.java | 116 +++++++++++++++++++++
4 files changed, 131 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index c88512771f..7d49fe8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -160,12 +161,19 @@ public class QueryExecution {
public ExecutionResult getStatus() {
// Although we monitor the state to transition to RUNNING, the future will return if any
// Terminated state is triggered
- ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
+ SettableFuture<QueryState> future = SettableFuture.create();
+ stateMachine.addStateChangeListener(state -> {
+ if (state == QueryState.RUNNING || state.isDone()) {
+ future.set(state);
+ }
+ });
+
try {
QueryState state = future.get();
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
TSStatusCode statusCode =
- state == QueryState.FINISHED
+ // For WRITE, the state should be FINISHED; For READ, the state could be RUNNING
+ state == QueryState.FINISHED || state == QueryState.RUNNING
? TSStatusCode.SUCCESS_STATUS
: TSStatusCode.QUERY_PROCESS_ERROR;
return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 66e7cbbc4d..27134951fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -83,6 +83,10 @@ public class QueryStateMachine {
return name;
}
+ public QueryState getState() {
+ return queryState.get();
+ }
+
public void transitionToPlanned() {
queryState.set(QueryState.PLANNED);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index 39e663fda5..54592584ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -266,7 +266,7 @@ public class StateMachine<T> {
* Listener is always notified asynchronously using a dedicated notification thread pool so, care
* should be taken to avoid leaking {@code this} when adding a listener in a constructor.
* Additionally, it is possible notifications are observed out of order due to the asynchronous
- * execution. The listener is immediately notified immediately of the current state.
+ * execution. The listener is notified immediately of the current state.
*/
public void addStateChangeListener(StateChangeListener<T> stateChangeListener) {
requireNonNull(stateChangeListener, "stateChangeListener is null");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
new file mode 100644
index 0000000000..dde44f0d64
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueryStateMachineTest {
+
+ @Test
+ public void TestBasicTransition() {
+ QueryStateMachine stateMachine = genQueryStateMachine();
+ Assert.assertEquals(stateMachine.getState(), QueryState.QUEUED);
+ stateMachine.transitionToDispatching();
+ Assert.assertEquals(stateMachine.getState(), QueryState.DISPATCHING);
+ stateMachine.transitionToRunning();
+ Assert.assertEquals(stateMachine.getState(), QueryState.RUNNING);
+ stateMachine.transitionToAborted();
+ Assert.assertEquals(stateMachine.getState(), QueryState.ABORTED);
+
+ // StateMachine with Terminal State is not allowed to transition state
+ stateMachine = genQueryStateMachine();
+ stateMachine.transitionToCanceled();
+ Assert.assertEquals(stateMachine.getState(), QueryState.CANCELED);
+
+ stateMachine = genQueryStateMachine();
+ stateMachine.transitionToFinished();
+ Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
+ }
+
+ @Test
+ public void TestFragmentInstanceToFinished() {
+ List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
+ QueryStateMachine stateMachine = genQueryStateMachine();
+ for(FragmentInstanceId id : instanceIds) {
+ stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
+ }
+ for(FragmentInstanceId id : instanceIds) {
+ stateMachine.updateFragInstanceState(id, FragmentInstanceState.FINISHED);
+ }
+ Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
+ }
+
+ @Test
+ public void TestFragmentInstanceToTerminalState() {
+ List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
+ QueryStateMachine stateMachine = genQueryStateMachine();
+ for(FragmentInstanceId id : instanceIds) {
+ stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
+ }
+ stateMachine.updateFragInstanceState(instanceIds.get(0), FragmentInstanceState.FAILED);
+ Assert.assertEquals(stateMachine.getState(), QueryState.FAILED);
+ }
+
+ @Test
+ public void TestListener() throws ExecutionException, InterruptedException {
+ AtomicInteger stateChangeCounter = new AtomicInteger(0);
+ QueryStateMachine stateMachine = genQueryStateMachine();
+ stateMachine.addStateChangeListener(state -> {
+ stateChangeCounter.getAndIncrement();
+ });
+ stateMachine.transitionToFinished();
+ SettableFuture<QueryState> future = SettableFuture.create();
+ stateMachine.addStateChangeListener(state -> {
+ if (state == QueryState.FINISHED) {
+ future.set(QueryState.FINISHED);
+ }
+ });
+ future.get();
+ Assert.assertEquals(stateChangeCounter.get(), 2);
+ }
+
+ private QueryStateMachine genQueryStateMachine() {
+ return new QueryStateMachine(genQueryId(), IoTDBThreadPoolFactory.newSingleThreadExecutor("TestQueryStateMachine"));
+ }
+
+ private List<FragmentInstanceId> genFragmentInstanceIdList() {
+ return Arrays.asList(
+ new FragmentInstanceId(new PlanFragmentId(genQueryId(), 1), "1"),
+ new FragmentInstanceId(new PlanFragmentId(genQueryId(), 2), "1"),
+ new FragmentInstanceId(new PlanFragmentId(genQueryId(), 3), "1"),
+ new FragmentInstanceId(new PlanFragmentId(genQueryId(), 4), "1"),
+ new FragmentInstanceId(new PlanFragmentId(genQueryId(), 4), "2"));
+ }
+
+ private QueryId genQueryId() {
+ return new QueryId("test_query");
+ }
+}