You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/25 14:37:24 UTC

[iotdb] branch xingtanzjr/query_execution created (now f4f0e02)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f4f0e02  complete state machine implementation

This branch includes the following new commits:

     new f4f0e02  complete state machine implementation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: complete state machine implementation

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f4f0e02cebd977c896a9c69be7c0ecaa1e11b9f0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Mar 25 22:36:14 2022 +0800

    complete state machine implementation
---
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  18 +-
 .../{ExecutionResult.java => ExecutionStatus.java} |   4 +-
 .../iotdb/db/mpp/execution/FutureStateChange.java  |  78 +++++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  35 ++-
 .../{QueryStateMachine.java => QueryState.java}    |  36 ++-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  67 ++++-
 .../iotdb/db/mpp/execution/StateMachine.java       | 324 +++++++++++++++++++++
 .../mpp/execution/scheduler/ClusterScheduler.java  |  25 +-
 .../FragInstanceDispatchResult.java}               |  19 +-
 .../IFragInstanceDispatcher.java}                  |  21 +-
 .../db/mpp/execution/scheduler/IScheduler.java     |   2 +-
 ...uler.java => InternalServiceClientFactory.java} |  37 ++-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  50 ++++
 .../execution/scheduler/StandaloneScheduler.java   |   2 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |   4 +-
 15 files changed, 662 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index b2025bf..3ce8f7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -33,21 +33,19 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class Coordinator {
 
+  private static final Coordinator INSTANCE = new Coordinator();
+
   private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
 
-  public static Coordinator getInstance() {
-    return new Coordinator();
+  private Coordinator() {
+    this.queryExecutionMap = new ConcurrentHashMap<>();
   }
 
   private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
     return new QueryExecution(statement, queryContext);
   }
 
-  private QueryExecution getQueryExecutionById() {
-    return null;
-  }
-
-  public ExecutionResult execute(
+  public ExecutionStatus execute(
       Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
 
     QueryExecution execution =
@@ -56,7 +54,11 @@ public class Coordinator {
 
     execution.start();
 
-    return execution.getResult();
+    return execution.getStatus();
+  }
+
+  public static Coordinator getInstance() {
+    return INSTANCE;
   }
 
   //    private TQueryResponse executeQuery(TQueryRequest request) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
index 4dae202..8c24ade 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.db.mpp.execution;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-public class ExecutionResult {
+public class ExecutionStatus {
   public QueryId queryId;
   public TSStatus status;
 
-  public ExecutionResult(QueryId queryId, TSStatus status) {
+  public ExecutionStatus(QueryId queryId, TSStatus status) {
     this.queryId = queryId;
     this.status = status;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
new file mode 100644
index 0000000..55fc2da
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.util.Objects.requireNonNull;
+
+@ThreadSafe
+public class FutureStateChange<T>
+{
+    // Use a separate future for each listener so canceled listeners can be removed
+    @GuardedBy("listeners")
+    private final Set<SettableFuture<T>> listeners = new HashSet<>();
+
+    public ListenableFuture<T> createNewListener()
+    {
+        SettableFuture<T> listener = SettableFuture.create();
+        synchronized (listeners) {
+            listeners.add(listener);
+        }
+
+        // remove the listener when the future completes
+        listener.addListener(
+                () -> {
+                    synchronized (listeners) {
+                        listeners.remove(listener);
+                    }
+                },
+                directExecutor());
+
+        return listener;
+    }
+
+    public void complete(T newState)
+    {
+        fireStateChange(newState, directExecutor());
+    }
+
+    public void complete(T newState, Executor executor)
+    {
+        fireStateChange(newState, executor);
+    }
+
+    private void fireStateChange(T newState, Executor executor)
+    {
+        requireNonNull(executor, "executor is null");
+        Set<SettableFuture<T>> futures;
+        synchronized (listeners) {
+            futures = ImmutableSet.copyOf(listeners);
+            listeners.clear();
+        }
+
+        for (SettableFuture<T> future : futures) {
+            executor.execute(() -> future.set(newState));
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0b15ef1..4a809a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
@@ -28,10 +29,12 @@ import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.iotdb.rpc.RpcUtils.getStatus;
 
@@ -51,11 +54,15 @@ public class QueryExecution {
   private final Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
-  private List<FragmentInstance> fragmentInstances;
 
   public QueryExecution(Statement statement, MPPQueryContext context) {
     this.context = context;
     this.analysis = analyze(statement, context);
+    this.stateMachine = new QueryStateMachine(context.getQueryId());
+    // TODO: (xingtanzjr) initialize the query scheduler according to configuration
+    this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances());
+
+    // TODO: register callbacks in QueryStateMachine when the QueryExecution is aborted/finished
   }
 
   public void start() {
@@ -71,7 +78,7 @@ public class QueryExecution {
   }
 
   private void schedule() {
-    this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
+    this.scheduler = new ClusterScheduler(this.stateMachine, this.distributedPlan.getInstances());
     this.scheduler.start();
   }
 
@@ -90,7 +97,8 @@ public class QueryExecution {
   /**
    * This method will be called by the request thread from client connection. This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
-   * 3. The query has been cancelled 4. The query is timeout This method will fetch the result from
+   * 3. The query has been cancelled 4. The query is timeout This method wil
+   * l fetch the result from
    * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
    * implemented with DataStreamManager)
    */
@@ -98,8 +106,23 @@ public class QueryExecution {
     return null;
   }
 
-  public ExecutionResult getResult() {
-
-    return new ExecutionResult(context.getQueryId(), getStatus(TSStatusCode.SUCCESS_STATUS));
+  /**
+   * This method is a synchronized method.
+   * For READ, it will block until all the FragmentInstances have been submitted.
+   * For WRITE, it will block until all the FragmentInstances have finished.
+   * @return ExecutionStatus. Contains the QueryId and the TSStatus.
+   */
+  public ExecutionStatus getStatus() {
+    // Although we monitor the state to transition to FINISHED, the future will return if any Terminated state is triggered
+    ListenableFuture<QueryState> future =  stateMachine.getStateChange(QueryState.FINISHED);
+    try {
+      QueryState state = future.get();
+      // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
+      TSStatusCode statusCode = state == QueryState.FINISHED ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
+      return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
+    } catch (InterruptedException | ExecutionException e) {
+      // TODO: (xingtanzjr) use more accurate error handling
+      return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index d8ca6bd..b585ab2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -16,10 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution;
 
-/**
- * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
- * register listeners when the state changes of the QueryExecution.
- */
-public class QueryStateMachine {}
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum QueryState {
+    QUEUED(false),
+    PLANNED(false),
+    DISPATCHING(false),
+    RUNNING(false),
+    FINISHED(true),
+    CANCELED(true),
+    ABORTED(true),
+    FAILED(true);
+
+    private final boolean doneState;
+
+    public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
+            Stream.of(QueryState.values())
+                    .filter(QueryState::isDone)
+                    .collect(toImmutableSet());
+
+    QueryState(boolean doneState) {
+        this.doneState = doneState;
+    }
+
+    public boolean isDone() {
+        return doneState;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index d8ca6bd..3585009 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -18,8 +18,73 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.QueryId;
+
+import java.util.concurrent.Executor;
+
 /**
  * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
  * register listeners when the state changes of the QueryExecution.
  */
-public class QueryStateMachine {}
+public class QueryStateMachine {
+    private String name;
+    private StateMachine<QueryState> queryState;
+
+    // The executor will be used in all the state machines belonged to this query.
+    private Executor stateMachineExecutor;
+
+    public QueryStateMachine(QueryId queryId) {
+        this.name = String.format("QueryStateMachine[%s]", queryId);
+        this.stateMachineExecutor = getStateMachineExecutor();
+        queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
+    }
+
+    // TODO: (xingtanzjr) consider more suitable method for executor initialization
+    private Executor getStateMachineExecutor() {
+        return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
+    }
+
+    public void addStateChangeListener(StateMachine.StateChangeListener<QueryState> stateChangeListener)
+    {
+        queryState.addStateChangeListener(stateChangeListener);
+    }
+
+    public ListenableFuture<QueryState> getStateChange(QueryState currentState)
+    {
+        return queryState.getStateChange(currentState);
+    }
+
+    private String getName() {
+        return name;
+    }
+
+    public void transitionToPlanned() {
+        queryState.set(QueryState.PLANNED);
+    }
+
+    public void transitionToDispatching() {
+        queryState.set(QueryState.DISPATCHING);
+    }
+
+    public void transitionToRunning() {
+        queryState.set(QueryState.RUNNING);
+    }
+
+    public void transitionToFinished() {
+        queryState.set(QueryState.FINISHED);
+    }
+
+    public void transitionToCanceled() {
+        queryState.set(QueryState.CANCELED);
+    }
+
+    public void transitionToAborted() {
+        queryState.set(QueryState.ABORTED);
+    }
+
+    public void transitionToFailed() {
+        queryState.set(QueryState.FAILED);
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
new file mode 100644
index 0000000..78d7bad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Simple state machine which holds a single state. Callers can register for state change events.
+ */
+@ThreadSafe
+public class StateMachine<T>
+{
+    private final String name;
+    private final Executor executor;
+    private final Object lock = new Object();
+    private final Set<T> terminalStates;
+
+    @GuardedBy("lock")
+    private volatile T state;
+
+    @GuardedBy("lock")
+    private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
+
+    private final AtomicReference<FutureStateChange<T>> futureStateChange = new AtomicReference<>(new FutureStateChange<>());
+
+    /**
+     * Creates a state machine with the specified initial state and no terminal states.
+     *
+     * @param name name of this state machine to use in debug statements
+     * @param executor executor for firing state change events; must not be a same thread executor
+     * @param initialState the initial state
+     */
+    public StateMachine(String name, Executor executor, T initialState)
+    {
+        this(name, executor, initialState, ImmutableSet.of());
+    }
+
+    /**
+     * Creates a state machine with the specified initial state and terminal states.
+     *
+     * @param name name of this state machine to use in debug statements
+     * @param executor executor for firing state change events; must not be a same thread executor
+     * @param initialState the initial state
+     * @param terminalStates the terminal states
+     */
+    public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates)
+    {
+        this.name = requireNonNull(name, "name is null");
+        this.executor = requireNonNull(executor, "executor is null");
+        this.state = requireNonNull(initialState, "initialState is null");
+        this.terminalStates = ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
+    }
+
+    // state changes are atomic and state is volatile, so a direct read is safe here
+    @SuppressWarnings("FieldAccessNotGuarded")
+    public T get()
+    {
+        return state;
+    }
+
+    /**
+     * Sets the state.
+     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
+     *
+     * @return the old state
+     * @throws IllegalStateException if state change would cause a transition from a terminal state
+     */
+    public T set(T newState)
+    {
+        T oldState = trySet(newState);
+        checkState(oldState.equals(newState) || !isTerminalState(oldState), "%s cannot transition from %s to %s", name, state, newState);
+        return oldState;
+    }
+
+    /**
+     * Tries to change the state.  State will not change if the new state {@code .equals()} the current state,
+     * of if the current state is a terminal state. If the state changed, listeners and waiters will be notified.
+     *
+     * @return the state before the possible state change
+     */
+    public T trySet(T newState)
+    {
+        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+        requireNonNull(newState, "newState is null");
+
+        T oldState;
+        FutureStateChange<T> futureStateChange;
+        ImmutableList<StateChangeListener<T>> stateChangeListeners;
+        synchronized (lock) {
+            if (state.equals(newState) || isTerminalState(state)) {
+                return state;
+            }
+
+            oldState = state;
+            state = newState;
+
+            futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+            stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+
+            // if we are now in a terminal state, free the listeners since this will be the last notification
+            if (isTerminalState(state)) {
+                this.stateChangeListeners.clear();
+            }
+        }
+
+        fireStateChanged(newState, futureStateChange, stateChangeListeners);
+        return oldState;
+    }
+
+    /**
+     * Sets the state if the current state satisfies the specified predicate.
+     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
+     *
+     * @return true if the state is set
+     */
+    public boolean setIf(T newState, Predicate<T> predicate)
+    {
+        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+        requireNonNull(newState, "newState is null");
+
+        while (true) {
+            // check if the current state passes the predicate
+            T currentState = get();
+
+            // change to same state is not a change, and does not notify the notify listeners
+            if (currentState.equals(newState)) {
+                return false;
+            }
+
+            // do not call predicate while holding the lock
+            if (!predicate.test(currentState)) {
+                return false;
+            }
+
+            // if state did not change while, checking the predicate, apply the new state
+            if (compareAndSet(currentState, newState)) {
+                return true;
+            }
+        }
+    }
+
+    /**
+     * Sets the state if the current state {@code .equals()} the specified expected state.
+     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
+     *
+     * @return true if the state is set
+     */
+    public boolean compareAndSet(T expectedState, T newState)
+    {
+        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+        requireNonNull(expectedState, "expectedState is null");
+        requireNonNull(newState, "newState is null");
+
+        FutureStateChange<T> futureStateChange;
+        ImmutableList<StateChangeListener<T>> stateChangeListeners;
+        synchronized (lock) {
+            if (!state.equals(expectedState)) {
+                return false;
+            }
+
+            // change to same state is not a change, and does not notify the notify listeners
+            if (state.equals(newState)) {
+                return false;
+            }
+
+            checkState(!isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
+
+            state = newState;
+
+            futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+            stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+
+            // if we are now in a terminal state, free the listeners since this will be the last notification
+            if (isTerminalState(state)) {
+                this.stateChangeListeners.clear();
+            }
+        }
+
+        fireStateChanged(newState, futureStateChange, stateChangeListeners);
+        return true;
+    }
+
+    private void fireStateChanged(T newState, FutureStateChange<T> futureStateChange, List<StateChangeListener<T>> stateChangeListeners)
+    {
+        checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
+        requireNonNull(newState, "newState is null");
+
+        // always fire listener callbacks from a different thread
+        safeExecute(() -> {
+            checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
+            try {
+                futureStateChange.complete(newState);
+            }
+            catch (Throwable e) {
+//                log.error(e, "Error setting future state for %s", name);
+            }
+            for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
+                fireStateChangedListener(newState, stateChangeListener);
+            }
+        });
+    }
+
+    private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener)
+    {
+        try {
+            stateChangeListener.stateChanged(newState);
+        }
+        catch (Throwable e) {
+//            log.error(e, "Error notifying state change listener for %s", name);
+        }
+    }
+
+    /**
+     * Gets a future that completes when the state is no longer {@code .equals()} to {@code currentState)}.
+     */
+    public ListenableFuture<T> getStateChange(T currentState)
+    {
+        checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
+        requireNonNull(currentState, "currentState is null");
+
+        synchronized (lock) {
+            // return a completed future if the state has already changed, or we are in a terminal state
+            if (!state.equals(currentState) || isTerminalState(state)) {
+                return immediateFuture(state);
+            }
+
+            return futureStateChange.get().createNewListener();
+        }
+    }
+
+    /**
+     * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
+     * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
+     * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
+     * possible notifications are observed out of order due to the asynchronous execution. The listener is
+     * immediately notified immediately of the current state.
+     */
+    public void addStateChangeListener(StateChangeListener<T> stateChangeListener)
+    {
+        requireNonNull(stateChangeListener, "stateChangeListener is null");
+
+        boolean inTerminalState;
+        T currentState;
+        synchronized (lock) {
+            currentState = state;
+            inTerminalState = isTerminalState(currentState);
+            if (!inTerminalState) {
+                stateChangeListeners.add(stateChangeListener);
+            }
+        }
+
+        // fire state change listener with the current state
+        // always fire listener callbacks from a different thread
+        safeExecute(() -> stateChangeListener.stateChanged(currentState));
+    }
+
+    @VisibleForTesting
+    boolean isTerminalState(T state)
+    {
+        return terminalStates.contains(state);
+    }
+
+    @VisibleForTesting
+    List<StateChangeListener<T>> getStateChangeListeners()
+    {
+        synchronized (lock) {
+            return ImmutableList.copyOf(stateChangeListeners);
+        }
+    }
+
+    public interface StateChangeListener<T>
+    {
+        void stateChanged(T newState);
+    }
+
+    @Override
+    public String toString()
+    {
+        return get().toString();
+    }
+
+    private void safeExecute(Runnable command)
+    {
+        try {
+            executor.execute(command);
+        }
+        catch (RejectedExecutionException e) {
+            if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
+                // TODO: (xingtanzjr) handle the exception
+                throw new RuntimeException("Server is shutting down", e);
+            }
+            throw e;
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 0a18d07..820da6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -21,12 +21,16 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import io.airlift.units.Duration;
 
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 /**
  * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
@@ -41,14 +45,31 @@ public class ClusterScheduler implements IScheduler {
 
   // The fragment instances which should be sent to corresponding Nodes.
   private List<FragmentInstance> instances;
+  private IFragInstanceDispatcher dispatcher;
 
   public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
     this.stateMachine = stateMachine;
     this.instances = instances;
+    this.dispatcher = new SimpleFragInstanceDispatcher();
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO: consider where the state transition should be put
+    stateMachine.transitionToDispatching();
+    Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
+    //TODO: start the FragmentInstance state fetcher
+    try {
+      FragInstanceDispatchResult result = dispatchResultFuture.get();
+      if (result.isSuccessful()) {
+        stateMachine.transitionToRunning();
+      } else {
+        stateMachine.transitionToFailed();
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      e.printStackTrace();
+    }
+  }
 
   @Override
   public void abort() {}
@@ -64,7 +85,7 @@ public class ClusterScheduler implements IScheduler {
   }
 
   @Override
-  public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
   public void cancelFragment(PlanFragmentId planFragmentId) {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
index d8ca6bd..8108c9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
@@ -16,10 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution;
 
-/**
- * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
- * register listeners when the state changes of the QueryExecution.
- */
-public class QueryStateMachine {}
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+public class FragInstanceDispatchResult {
+    private boolean successful;
+
+    public FragInstanceDispatchResult(boolean successful) {
+        this.successful = successful;
+    }
+
+    public boolean isSuccessful() {
+        return successful;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
index d8ca6bd..ce3b2d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
@@ -16,10 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution;
 
-/**
- * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
- * register listeners when the state changes of the QueryExecution.
- */
-public class QueryStateMachine {}
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+public interface IFragInstanceDispatcher {
+    /**
+     * Dispatch all Fragment instances asynchronously
+     * @param instances Fragment instance list
+     * @return Boolean.
+     */
+    Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index 1ecd4e6..2c4892d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -34,7 +34,7 @@ public interface IScheduler {
 
   FragmentInfo getFragmentInfo();
 
-  void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
+  void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
 
   void cancelFragment(PlanFragmentId planFragmentId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index 1ecd4e6..d79b66c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,25 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.scheduler;
-
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.execution.FragmentInfo;
-
-import io.airlift.units.Duration;
-
-public interface IScheduler {
-
-  void start();
 
-  void abort();
-
-  Duration getTotalCpuTime();
-
-  FragmentInfo getFragmentInfo();
-
-  void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
+package org.apache.iotdb.db.mpp.execution.scheduler;
 
-  void cancelFragment(PlanFragmentId planFragmentId);
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class InternalServiceClientFactory {
+    // TODO: (xingtanzjr) consider the best practice to maintain the clients
+    public static InternalService.Client getInternalServiceClient(String endpoint, int port) throws TTransportException {
+        TTransport transport = new TSocket(endpoint, port);
+        transport.open();
+        TProtocol protocol = new TBinaryProtocol(transport);
+        return new InternalService.Client(protocol);
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
new file mode 100644
index 0000000..ffe4758
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
+
+    @Override
+    public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
+        FutureTask<FragInstanceDispatchResult> dispatchTask = new FutureTask<>(() -> {
+            try {
+                for (FragmentInstance instance : instances) {
+                    InternalService.Client client = InternalServiceClientFactory.
+                            getInternalServiceClient(instance.getHostEndpoint().ip, instance.getHostEndpoint().port);
+                    client.sendFragmentInstance(null);
+                }
+            } catch (TException e) {
+                // TODO: (xingtanzjr) add more details
+                return new FragInstanceDispatchResult(false);
+            }
+            return new FragInstanceDispatchResult(true);
+        });
+        new Thread(dispatchTask).start();
+        return dispatchTask;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index d876561..8be3082 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -49,7 +49,7 @@ public class StandaloneScheduler implements IScheduler {
   }
 
   @Override
-  public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
   public void cancelFragment(PlanFragmentId planFragmentId) {}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 04b58a3..b996827 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.execution.ExecutionStatus;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
@@ -1633,7 +1633,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionResult result =
+      ExecutionStatus result =
           coordinator.execute(
               statement,
               new QueryId(String.valueOf(queryId)),