You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:57:39 UTC
[iotdb] branch xingtanzjr/query_execution updated: add state transition in QueryStateMachine
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new 0c17103d1b add state transition in QueryStateMachine
0c17103d1b is described below
commit 0c17103d1b225b70eee81acef8bb4f2a7e49c7ae
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 11:56:46 2022 +0800
add state transition in QueryStateMachine
---
.../db/mpp/execution/FragmentInstanceState.java | 28 +++++++++++++++-------
.../iotdb/db/mpp/execution/QueryStateMachine.java | 16 ++++++++-----
.../scheduler/AbstractFragInsStateTracker.java | 5 +++-
.../mpp/execution/scheduler/ClusterScheduler.java | 18 +++++++-------
.../scheduler/FixedRateFragInsStateTracker.java | 9 +++++--
5 files changed, 49 insertions(+), 27 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index a98210b882..d55af83e98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -28,41 +28,53 @@ public enum FragmentInstanceState {
* Instance is planned but has not been scheduled yet. An instance will be in the planned state
* until, the dependencies of the instance have begun producing output.
*/
- PLANNED(false),
+ PLANNED(false, false),
/** Instance is running. */
- RUNNING(false),
+ RUNNING(false, false),
/**
* Instance has finished executing and output is left to be consumed. In this state, there will be
* no new drivers, the existing drivers have finished and the output buffer of the instance is
* at-least in a 'no-more-tsBlocks' state.
*/
- FLUSHING(false),
+ FLUSHING(false, false),
/** Instance has finished executing and all output has been consumed. */
- FINISHED(true),
+ FINISHED(true, false),
/** Instance was canceled by a user. */
- CANCELED(true),
+ CANCELED(true, true),
/** Instance was aborted due to a failure in the query. The failure was not in this instance. */
- ABORTED(true),
+ ABORTED(true, true),
/** Instance execution failed. */
- FAILED(true);
+ FAILED(true, true);
public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
Stream.of(FragmentInstanceState.values())
.filter(FragmentInstanceState::isDone)
.collect(toImmutableSet());
+ public static final Set<FragmentInstanceState> FAILURE_INSTANCE_STATES =
+ Stream.of(FragmentInstanceState.values())
+ .filter(FragmentInstanceState::isFailed)
+ .collect(toImmutableSet());
+
/**
* If doneState is true, indicating that it won't transfer to another state anymore, i.e. a
* terminal state.
*/
private final boolean doneState;
- FragmentInstanceState(boolean doneState) {
+ private final boolean failureState;
+
+ FragmentInstanceState(boolean doneState, boolean failureState) {
this.doneState = doneState;
+ this.failureState = failureState;
}
/** Is this a terminal state. */
public boolean isDone() {
return doneState;
}
+
+ public boolean isFailed() {
+ return failureState;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 53f33e859c..66e7cbbc4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -59,11 +58,16 @@ public class QueryStateMachine {
public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
this.fragInstanceStateMap.put(id, state);
- }
-
- // TODO: (xingtanzjr) consider more suitable method for executor initialization
- private Executor getStateMachineExecutor() {
- return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
+ // TODO: (xingtanzjr) we need to distinguish the Timeout situation
+ if (state.isFailed()) {
+ transitionToFailed();
+ }
+ boolean allFinished =
+ fragInstanceStateMap.values().stream()
+ .allMatch(currentState -> currentState == FragmentInstanceState.FINISHED);
+ if (allFinished) {
+ transitionToFinished();
+ }
}
public void addStateChangeListener(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index b7908f048b..6f64a27a1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -41,7 +41,10 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
protected List<FragmentInstance> instances;
public AbstractFragInsStateTracker(
- QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+ QueryStateMachine stateMachine,
+ ExecutorService executor,
+ ScheduledExecutorService scheduledExecutor,
+ List<FragmentInstance> instances) {
this.stateMachine = stateMachine;
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 2fa2971d0b..68c15aa4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -66,7 +66,8 @@ public class ClusterScheduler implements IScheduler {
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.dispatcher = new SimpleFragInstanceDispatcher();
- this.stateTracker = new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+ this.stateTracker =
+ new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
}
@Override
@@ -116,7 +117,8 @@ public class ClusterScheduler implements IScheduler {
@Override
public void abort() {
- // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best practice ?
+ // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
+ // practice ?
dispatcher.abort();
stateTracker.abort();
}
@@ -132,18 +134,14 @@ public class ClusterScheduler implements IScheduler {
}
@Override
- public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {
- }
+ public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
@Override
- public void cancelFragment(PlanFragmentId planFragmentId) {
- }
+ public void cancelFragment(PlanFragmentId planFragmentId) {}
// Send the instances to other nodes
- private void sendFragmentInstances() {
- }
+ private void sendFragmentInstances() {}
// After sending, start to collect the states of these fragment instances
- private void startMonitorInstances() {
- }
+ private void startMonitorInstances() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index cc993298d2..0fb90f160a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -34,13 +34,18 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
private ScheduledFuture<?> trackTask;
public FixedRateFragInsStateTracker(
- QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+ QueryStateMachine stateMachine,
+ ExecutorService executor,
+ ScheduledExecutorService scheduledExecutor,
+ List<FragmentInstance> instances) {
super(stateMachine, executor, scheduledExecutor, instances);
}
@Override
public void start() {
- trackTask = scheduledExecutor.scheduleAtFixedRate(this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ trackTask =
+ scheduledExecutor.scheduleAtFixedRate(
+ this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
}
@Override