You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:57:39 UTC

[iotdb] branch xingtanzjr/query_execution updated: add state transition in QueryStateMachine

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new 0c17103d1b add state transition in QueryStateMachine
0c17103d1b is described below

commit 0c17103d1b225b70eee81acef8bb4f2a7e49c7ae
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 11:56:46 2022 +0800

    add state transition in QueryStateMachine
---
 .../db/mpp/execution/FragmentInstanceState.java    | 28 +++++++++++++++-------
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 16 ++++++++-----
 .../scheduler/AbstractFragInsStateTracker.java     |  5 +++-
 .../mpp/execution/scheduler/ClusterScheduler.java  | 18 +++++++-------
 .../scheduler/FixedRateFragInsStateTracker.java    |  9 +++++--
 5 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
index a98210b882..d55af83e98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -28,41 +28,53 @@ public enum FragmentInstanceState {
    * Instance is planned but has not been scheduled yet. An instance will be in the planned state
    * until, the dependencies of the instance have begun producing output.
    */
-  PLANNED(false),
+  PLANNED(false, false),
   /** Instance is running. */
-  RUNNING(false),
+  RUNNING(false, false),
   /**
    * Instance has finished executing and output is left to be consumed. In this state, there will be
    * no new drivers, the existing drivers have finished and the output buffer of the instance is
    * at-least in a 'no-more-tsBlocks' state.
    */
-  FLUSHING(false),
+  FLUSHING(false, false),
   /** Instance has finished executing and all output has been consumed. */
-  FINISHED(true),
+  FINISHED(true, false),
   /** Instance was canceled by a user. */
-  CANCELED(true),
+  CANCELED(true, true),
   /** Instance was aborted due to a failure in the query. The failure was not in this instance. */
-  ABORTED(true),
+  ABORTED(true, true),
   /** Instance execution failed. */
-  FAILED(true);
+  FAILED(true, true);
 
   public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
       Stream.of(FragmentInstanceState.values())
           .filter(FragmentInstanceState::isDone)
           .collect(toImmutableSet());
 
+  public static final Set<FragmentInstanceState> FAILURE_INSTANCE_STATES =
+      Stream.of(FragmentInstanceState.values())
+          .filter(FragmentInstanceState::isFailed)
+          .collect(toImmutableSet());
+
   /**
    * If doneState is true, indicating that it won't transfer to another state anymore, i.e. a
    * terminal state.
    */
   private final boolean doneState;
 
-  FragmentInstanceState(boolean doneState) {
+  private final boolean failureState;
+
+  FragmentInstanceState(boolean doneState, boolean failureState) {
     this.doneState = doneState;
+    this.failureState = failureState;
   }
 
   /** Is this a terminal state. */
   public boolean isDone() {
     return doneState;
   }
+
+  public boolean isFailed() {
+    return failureState;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 53f33e859c..66e7cbbc4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 
@@ -59,11 +58,16 @@ public class QueryStateMachine {
 
   public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
     this.fragInstanceStateMap.put(id, state);
-  }
-
-  // TODO: (xingtanzjr) consider more suitable method for executor initialization
-  private Executor getStateMachineExecutor() {
-    return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
+    // TODO: (xingtanzjr) we need to distinguish the Timeout situation
+    if (state.isFailed()) {
+      transitionToFailed();
+    }
+    boolean allFinished =
+        fragInstanceStateMap.values().stream()
+            .allMatch(currentState -> currentState == FragmentInstanceState.FINISHED);
+    if (allFinished) {
+      transitionToFinished();
+    }
   }
 
   public void addStateChangeListener(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index b7908f048b..6f64a27a1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -41,7 +41,10 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected List<FragmentInstance> instances;
 
   public AbstractFragInsStateTracker(
-      QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+      QueryStateMachine stateMachine,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor,
+      List<FragmentInstance> instances) {
     this.stateMachine = stateMachine;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 2fa2971d0b..68c15aa4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -66,7 +66,8 @@ public class ClusterScheduler implements IScheduler {
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.dispatcher = new SimpleFragInstanceDispatcher();
-    this.stateTracker = new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+    this.stateTracker =
+        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
   }
 
   @Override
@@ -116,7 +117,8 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void abort() {
-    // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best practice ?
+    // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
+    // practice ?
     dispatcher.abort();
     stateTracker.abort();
   }
@@ -132,18 +134,14 @@ public class ClusterScheduler implements IScheduler {
   }
 
   @Override
-  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {
-  }
+  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
-  public void cancelFragment(PlanFragmentId planFragmentId) {
-  }
+  public void cancelFragment(PlanFragmentId planFragmentId) {}
 
   // Send the instances to other nodes
-  private void sendFragmentInstances() {
-  }
+  private void sendFragmentInstances() {}
 
   // After sending, start to collect the states of these fragment instances
-  private void startMonitorInstances() {
-  }
+  private void startMonitorInstances() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index cc993298d2..0fb90f160a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -34,13 +34,18 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
   private ScheduledFuture<?> trackTask;
 
   public FixedRateFragInsStateTracker(
-      QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+      QueryStateMachine stateMachine,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor,
+      List<FragmentInstance> instances) {
     super(stateMachine, executor, scheduledExecutor, instances);
   }
 
   @Override
   public void start() {
-    trackTask = scheduledExecutor.scheduleAtFixedRate(this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+    trackTask =
+        scheduledExecutor.scheduleAtFixedRate(
+            this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
   }
 
   @Override