You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:39:44 UTC

[iotdb] 02/04: spotless

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e01ff0c86c0ca9e0582c20cb2f75e93d7c9db295
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 10:27:14 2022 +0800

    spotless
---
 .../iotdb/db/mpp/execution/FutureStateChange.java  |  76 ++-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  52 +-
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  44 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 145 +++---
 .../iotdb/db/mpp/execution/StateMachine.java       | 527 ++++++++++-----------
 .../scheduler/AbstractFragInsStateFetcher.java     |  45 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  16 +-
 .../scheduler/FixedRateFragInsStateFetcher.java    |  54 ++-
 .../scheduler/FragInstanceDispatchResult.java      |  14 +-
 .../scheduler/IFragInstanceDispatcher.java         |  15 +-
 .../scheduler/IFragInstanceStateFetcher.java       |   2 +-
 .../scheduler/InternalServiceClientFactory.java    |  16 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  38 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |   6 +-
 14 files changed, 534 insertions(+), 516 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
index 55fc2da276..0d33be9b16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -27,52 +28,47 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static java.util.Objects.requireNonNull;
 
 @ThreadSafe
-public class FutureStateChange<T>
-{
-    // Use a separate future for each listener so canceled listeners can be removed
-    @GuardedBy("listeners")
-    private final Set<SettableFuture<T>> listeners = new HashSet<>();
+public class FutureStateChange<T> {
+  // Use a separate future for each listener so canceled listeners can be removed
+  @GuardedBy("listeners")
+  private final Set<SettableFuture<T>> listeners = new HashSet<>();
 
-    public ListenableFuture<T> createNewListener()
-    {
-        SettableFuture<T> listener = SettableFuture.create();
-        synchronized (listeners) {
-            listeners.add(listener);
-        }
+  public ListenableFuture<T> createNewListener() {
+    SettableFuture<T> listener = SettableFuture.create();
+    synchronized (listeners) {
+      listeners.add(listener);
+    }
 
-        // remove the listener when the future completes
-        listener.addListener(
-                () -> {
-                    synchronized (listeners) {
-                        listeners.remove(listener);
-                    }
-                },
-                directExecutor());
+    // remove the listener when the future completes
+    listener.addListener(
+        () -> {
+          synchronized (listeners) {
+            listeners.remove(listener);
+          }
+        },
+        directExecutor());
 
-        return listener;
-    }
+    return listener;
+  }
 
-    public void complete(T newState)
-    {
-        fireStateChange(newState, directExecutor());
-    }
+  public void complete(T newState) {
+    fireStateChange(newState, directExecutor());
+  }
 
-    public void complete(T newState, Executor executor)
-    {
-        fireStateChange(newState, executor);
-    }
+  public void complete(T newState, Executor executor) {
+    fireStateChange(newState, executor);
+  }
 
-    private void fireStateChange(T newState, Executor executor)
-    {
-        requireNonNull(executor, "executor is null");
-        Set<SettableFuture<T>> futures;
-        synchronized (listeners) {
-            futures = ImmutableSet.copyOf(listeners);
-            listeners.clear();
-        }
+  private void fireStateChange(T newState, Executor executor) {
+    requireNonNull(executor, "executor is null");
+    Set<SettableFuture<T>> futures;
+    synchronized (listeners) {
+      futures = ImmutableSet.copyOf(listeners);
+      listeners.clear();
+    }
 
-        for (SettableFuture<T> future : futures) {
-            executor.execute(() -> future.set(newState));
-        }
+    for (SettableFuture<T> future : futures) {
+      executor.execute(() -> future.set(newState));
     }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0023b488e6..dd48dff9ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -34,6 +33,8 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -68,12 +69,13 @@ public class QueryExecution {
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
-    stateMachine.addStateChangeListener(state -> {
-      if (!state.isDone()) {
-        return;
-      }
-      this.cleanup();
-    });
+    stateMachine.addStateChangeListener(
+        state -> {
+          if (!state.isDone()) {
+            return;
+          }
+          this.cleanup();
+        });
   }
 
   public void start() {
@@ -90,7 +92,8 @@ public class QueryExecution {
 
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
-    this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+    this.scheduler =
+        new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
     // TODO: (xingtanzjr) how to make the schedule running asynchronously
     this.scheduler.start();
   }
@@ -108,7 +111,8 @@ public class QueryExecution {
   }
 
   /**
-   * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource releasing
+   * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource
+   * releasing
    */
   private void cleanup() {
     if (this.scheduler != null) {
@@ -117,18 +121,13 @@ public class QueryExecution {
     releaseResource();
   }
 
-  /**
-   * Release the resources that current QueryExecution hold.
-   */
-  private void releaseResource() {
-
-  }
+  /** Release the resources that current QueryExecution hold. */
+  private void releaseResource() {}
 
   /**
    * This method will be called by the request thread from client connection. This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
-   * 3. The query has been cancelled 4. The query is timeout This method wil
-   * l fetch the result from
+   * 3. The query has been cancelled 4. The query is timeout This method wil l fetch the result from
    * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
    * implemented with DataStreamManager)
    */
@@ -137,22 +136,27 @@ public class QueryExecution {
   }
 
   /**
-   * This method is a synchronized method.
-   * For READ, it will block until all the FragmentInstances have been submitted.
-   * For WRITE, it will block until all the FragmentInstances have finished.
+   * This method is a synchronized method. For READ, it will block until all the FragmentInstances
+   * have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
+   *
    * @return ExecutionStatus. Contains the QueryId and the TSStatus.
    */
   public ExecutionStatus getStatus() {
-    // Although we monitor the state to transition to RUNNING, the future will return if any Terminated state is triggered
-    ListenableFuture<QueryState> future =  stateMachine.getStateChange(QueryState.RUNNING);
+    // Although we monitor the state to transition to RUNNING, the future will return if any
+    // Terminated state is triggered
+    ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
     try {
       QueryState state = future.get();
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
-      TSStatusCode statusCode = state == QueryState.FINISHED ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
+      TSStatusCode statusCode =
+          state == QueryState.FINISHED
+              ? TSStatusCode.SUCCESS_STATUS
+              : TSStatusCode.QUERY_PROCESS_ERROR;
       return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
-      return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+      return new ExecutionStatus(
+          context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index b585ab28c8..383875a022 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -25,27 +25,25 @@ import java.util.stream.Stream;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
 
 public enum QueryState {
-    QUEUED(false),
-    PLANNED(false),
-    DISPATCHING(false),
-    RUNNING(false),
-    FINISHED(true),
-    CANCELED(true),
-    ABORTED(true),
-    FAILED(true);
-
-    private final boolean doneState;
-
-    public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
-            Stream.of(QueryState.values())
-                    .filter(QueryState::isDone)
-                    .collect(toImmutableSet());
-
-    QueryState(boolean doneState) {
-        this.doneState = doneState;
-    }
-
-    public boolean isDone() {
-        return doneState;
-    }
+  QUEUED(false),
+  PLANNED(false),
+  DISPATCHING(false),
+  RUNNING(false),
+  FINISHED(true),
+  CANCELED(true),
+  ABORTED(true),
+  FAILED(true);
+
+  private final boolean doneState;
+
+  public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
+      Stream.of(QueryState.values()).filter(QueryState::isDone).collect(toImmutableSet());
+
+  QueryState(boolean doneState) {
+    this.doneState = doneState;
+  }
+
+  public boolean isDone() {
+    return doneState;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 33b9049b89..0ed499c785 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -18,13 +18,12 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -34,72 +33,76 @@ import java.util.concurrent.Executor;
  * register listeners when the state changes of the QueryExecution.
  */
 public class QueryStateMachine {
-    private String name;
-    private StateMachine<QueryState> queryState;
-    private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
-
-    // The executor will be used in all the state machines belonged to this query.
-    private Executor stateMachineExecutor;
-
-    public QueryStateMachine(QueryId queryId) {
-        this.name = String.format("QueryStateMachine[%s]", queryId);
-        this.stateMachineExecutor = getStateMachineExecutor();
-        this.fragInstanceStateMap = new ConcurrentHashMap<>();
-        this.queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
-    }
-
-    public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
-        this.fragInstanceStateMap.put(id, state);
-    }
-
-    public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
-        this.fragInstanceStateMap.put(id, state);
-    }
-
-    // TODO: (xingtanzjr) consider more suitable method for executor initialization
-    private Executor getStateMachineExecutor() {
-        return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
-    }
-
-    public void addStateChangeListener(StateMachine.StateChangeListener<QueryState> stateChangeListener)
-    {
-        queryState.addStateChangeListener(stateChangeListener);
-    }
-
-    public ListenableFuture<QueryState> getStateChange(QueryState currentState)
-    {
-        return queryState.getStateChange(currentState);
-    }
-
-    private String getName() {
-        return name;
-    }
-
-    public void transitionToPlanned() {
-        queryState.set(QueryState.PLANNED);
-    }
-
-    public void transitionToDispatching() {
-        queryState.set(QueryState.DISPATCHING);
-    }
-
-    public void transitionToRunning() {
-        queryState.set(QueryState.RUNNING);
-    }
-
-    public void transitionToFinished() {
-        queryState.set(QueryState.FINISHED);
-    }
-
-    public void transitionToCanceled() {
-        queryState.set(QueryState.CANCELED);
-    }
-
-    public void transitionToAborted() {
-        queryState.set(QueryState.ABORTED);
-    }
-
-    public void transitionToFailed() {
-        queryState.set(QueryState.FAILED);
-    }
+  private String name;
+  private StateMachine<QueryState> queryState;
+  private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
+
+  // The executor will be used in all the state machines belonged to this query.
+  private Executor stateMachineExecutor;
+
+  public QueryStateMachine(QueryId queryId) {
+    this.name = String.format("QueryStateMachine[%s]", queryId);
+    this.stateMachineExecutor = getStateMachineExecutor();
+    this.fragInstanceStateMap = new ConcurrentHashMap<>();
+    this.queryState =
+        new StateMachine<>(
+            queryId.toString(),
+            this.stateMachineExecutor,
+            QueryState.QUEUED,
+            QueryState.TERMINAL_INSTANCE_STATES);
+  }
+
+  public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+    this.fragInstanceStateMap.put(id, state);
+  }
+
+  public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+    this.fragInstanceStateMap.put(id, state);
+  }
+
+  // TODO: (xingtanzjr) consider more suitable method for executor initialization
+  private Executor getStateMachineExecutor() {
+    return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
+  }
+
+  public void addStateChangeListener(
+      StateMachine.StateChangeListener<QueryState> stateChangeListener) {
+    queryState.addStateChangeListener(stateChangeListener);
+  }
+
+  public ListenableFuture<QueryState> getStateChange(QueryState currentState) {
+    return queryState.getStateChange(currentState);
+  }
+
+  private String getName() {
+    return name;
+  }
+
+  public void transitionToPlanned() {
+    queryState.set(QueryState.PLANNED);
+  }
+
+  public void transitionToDispatching() {
+    queryState.set(QueryState.DISPATCHING);
+  }
+
+  public void transitionToRunning() {
+    queryState.set(QueryState.RUNNING);
+  }
+
+  public void transitionToFinished() {
+    queryState.set(QueryState.FINISHED);
+  }
+
+  public void transitionToCanceled() {
+    queryState.set(QueryState.CANCELED);
+  }
+
+  public void transitionToAborted() {
+    queryState.set(QueryState.ABORTED);
+  }
+
+  public void transitionToFailed() {
+    queryState.set(QueryState.FAILED);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index 78d7badfea..39e663fda5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -37,288 +38,284 @@ import static java.util.Objects.requireNonNull;
  * Simple state machine which holds a single state. Callers can register for state change events.
  */
 @ThreadSafe
-public class StateMachine<T>
-{
-    private final String name;
-    private final Executor executor;
-    private final Object lock = new Object();
-    private final Set<T> terminalStates;
-
-    @GuardedBy("lock")
-    private volatile T state;
-
-    @GuardedBy("lock")
-    private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
-
-    private final AtomicReference<FutureStateChange<T>> futureStateChange = new AtomicReference<>(new FutureStateChange<>());
-
-    /**
-     * Creates a state machine with the specified initial state and no terminal states.
-     *
-     * @param name name of this state machine to use in debug statements
-     * @param executor executor for firing state change events; must not be a same thread executor
-     * @param initialState the initial state
-     */
-    public StateMachine(String name, Executor executor, T initialState)
-    {
-        this(name, executor, initialState, ImmutableSet.of());
-    }
-
-    /**
-     * Creates a state machine with the specified initial state and terminal states.
-     *
-     * @param name name of this state machine to use in debug statements
-     * @param executor executor for firing state change events; must not be a same thread executor
-     * @param initialState the initial state
-     * @param terminalStates the terminal states
-     */
-    public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates)
-    {
-        this.name = requireNonNull(name, "name is null");
-        this.executor = requireNonNull(executor, "executor is null");
-        this.state = requireNonNull(initialState, "initialState is null");
-        this.terminalStates = ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
-    }
-
-    // state changes are atomic and state is volatile, so a direct read is safe here
-    @SuppressWarnings("FieldAccessNotGuarded")
-    public T get()
-    {
+public class StateMachine<T> {
+  private final String name;
+  private final Executor executor;
+  private final Object lock = new Object();
+  private final Set<T> terminalStates;
+
+  @GuardedBy("lock")
+  private volatile T state;
+
+  @GuardedBy("lock")
+  private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
+
+  private final AtomicReference<FutureStateChange<T>> futureStateChange =
+      new AtomicReference<>(new FutureStateChange<>());
+
+  /**
+   * Creates a state machine with the specified initial state and no terminal states.
+   *
+   * @param name name of this state machine to use in debug statements
+   * @param executor executor for firing state change events; must not be a same thread executor
+   * @param initialState the initial state
+   */
+  public StateMachine(String name, Executor executor, T initialState) {
+    this(name, executor, initialState, ImmutableSet.of());
+  }
+
+  /**
+   * Creates a state machine with the specified initial state and terminal states.
+   *
+   * @param name name of this state machine to use in debug statements
+   * @param executor executor for firing state change events; must not be a same thread executor
+   * @param initialState the initial state
+   * @param terminalStates the terminal states
+   */
+  public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates) {
+    this.name = requireNonNull(name, "name is null");
+    this.executor = requireNonNull(executor, "executor is null");
+    this.state = requireNonNull(initialState, "initialState is null");
+    this.terminalStates =
+        ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
+  }
+
+  // state changes are atomic and state is volatile, so a direct read is safe here
+  @SuppressWarnings("FieldAccessNotGuarded")
+  public T get() {
+    return state;
+  }
+
+  /**
+   * Sets the state. If the new state does not {@code .equals()} the current state, listeners and
+   * waiters will be notified.
+   *
+   * @return the old state
+   * @throws IllegalStateException if state change would cause a transition from a terminal state
+   */
+  public T set(T newState) {
+    T oldState = trySet(newState);
+    checkState(
+        oldState.equals(newState) || !isTerminalState(oldState),
+        "%s cannot transition from %s to %s",
+        name,
+        state,
+        newState);
+    return oldState;
+  }
+
+  /**
+   * Tries to change the state. State will not change if the new state {@code .equals()} the current
+   * state, of if the current state is a terminal state. If the state changed, listeners and waiters
+   * will be notified.
+   *
+   * @return the state before the possible state change
+   */
+  public T trySet(T newState) {
+    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+    requireNonNull(newState, "newState is null");
+
+    T oldState;
+    FutureStateChange<T> futureStateChange;
+    ImmutableList<StateChangeListener<T>> stateChangeListeners;
+    synchronized (lock) {
+      if (state.equals(newState) || isTerminalState(state)) {
         return state;
-    }
+      }
 
-    /**
-     * Sets the state.
-     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
-     *
-     * @return the old state
-     * @throws IllegalStateException if state change would cause a transition from a terminal state
-     */
-    public T set(T newState)
-    {
-        T oldState = trySet(newState);
-        checkState(oldState.equals(newState) || !isTerminalState(oldState), "%s cannot transition from %s to %s", name, state, newState);
-        return oldState;
-    }
+      oldState = state;
+      state = newState;
 
-    /**
-     * Tries to change the state.  State will not change if the new state {@code .equals()} the current state,
-     * of if the current state is a terminal state. If the state changed, listeners and waiters will be notified.
-     *
-     * @return the state before the possible state change
-     */
-    public T trySet(T newState)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
-        requireNonNull(newState, "newState is null");
-
-        T oldState;
-        FutureStateChange<T> futureStateChange;
-        ImmutableList<StateChangeListener<T>> stateChangeListeners;
-        synchronized (lock) {
-            if (state.equals(newState) || isTerminalState(state)) {
-                return state;
-            }
-
-            oldState = state;
-            state = newState;
-
-            futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
-            stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
-
-            // if we are now in a terminal state, free the listeners since this will be the last notification
-            if (isTerminalState(state)) {
-                this.stateChangeListeners.clear();
-            }
-        }
-
-        fireStateChanged(newState, futureStateChange, stateChangeListeners);
-        return oldState;
-    }
+      futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+      stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
 
-    /**
-     * Sets the state if the current state satisfies the specified predicate.
-     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
-     *
-     * @return true if the state is set
-     */
-    public boolean setIf(T newState, Predicate<T> predicate)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
-        requireNonNull(newState, "newState is null");
-
-        while (true) {
-            // check if the current state passes the predicate
-            T currentState = get();
-
-            // change to same state is not a change, and does not notify the notify listeners
-            if (currentState.equals(newState)) {
-                return false;
-            }
-
-            // do not call predicate while holding the lock
-            if (!predicate.test(currentState)) {
-                return false;
-            }
-
-            // if state did not change while, checking the predicate, apply the new state
-            if (compareAndSet(currentState, newState)) {
-                return true;
-            }
-        }
+      // if we are now in a terminal state, free the listeners since this will be the last
+      // notification
+      if (isTerminalState(state)) {
+        this.stateChangeListeners.clear();
+      }
     }
 
-    /**
-     * Sets the state if the current state {@code .equals()} the specified expected state.
-     * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
-     *
-     * @return true if the state is set
-     */
-    public boolean compareAndSet(T expectedState, T newState)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
-        requireNonNull(expectedState, "expectedState is null");
-        requireNonNull(newState, "newState is null");
-
-        FutureStateChange<T> futureStateChange;
-        ImmutableList<StateChangeListener<T>> stateChangeListeners;
-        synchronized (lock) {
-            if (!state.equals(expectedState)) {
-                return false;
-            }
-
-            // change to same state is not a change, and does not notify the notify listeners
-            if (state.equals(newState)) {
-                return false;
-            }
-
-            checkState(!isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
-
-            state = newState;
-
-            futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
-            stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
-
-            // if we are now in a terminal state, free the listeners since this will be the last notification
-            if (isTerminalState(state)) {
-                this.stateChangeListeners.clear();
-            }
-        }
-
-        fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    return oldState;
+  }
+
+  /**
+   * Sets the state if the current state satisfies the specified predicate. If the new state does
+   * not {@code .equals()} the current state, listeners and waiters will be notified.
+   *
+   * @return true if the state is set
+   */
+  public boolean setIf(T newState, Predicate<T> predicate) {
+    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+    requireNonNull(newState, "newState is null");
+
+    while (true) {
+      // check if the current state passes the predicate
+      T currentState = get();
+
+      // change to same state is not a change, and does not notify the notify listeners
+      if (currentState.equals(newState)) {
+        return false;
+      }
+
+      // do not call predicate while holding the lock
+      if (!predicate.test(currentState)) {
+        return false;
+      }
+
+      // if state did not change while, checking the predicate, apply the new state
+      if (compareAndSet(currentState, newState)) {
         return true;
+      }
     }
-
-    private void fireStateChanged(T newState, FutureStateChange<T> futureStateChange, List<StateChangeListener<T>> stateChangeListeners)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
-        requireNonNull(newState, "newState is null");
-
-        // always fire listener callbacks from a different thread
-        safeExecute(() -> {
-            checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
-            try {
-                futureStateChange.complete(newState);
-            }
-            catch (Throwable e) {
-//                log.error(e, "Error setting future state for %s", name);
-            }
-            for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
-                fireStateChangedListener(newState, stateChangeListener);
-            }
-        });
+  }
+
+  /**
+   * Sets the state if the current state {@code .equals()} the specified expected state. If the new
+   * state does not {@code .equals()} the current state, listeners and waiters will be notified.
+   *
+   * @return true if the state is set
+   */
+  public boolean compareAndSet(T expectedState, T newState) {
+    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+    requireNonNull(expectedState, "expectedState is null");
+    requireNonNull(newState, "newState is null");
+
+    FutureStateChange<T> futureStateChange;
+    ImmutableList<StateChangeListener<T>> stateChangeListeners;
+    synchronized (lock) {
+      if (!state.equals(expectedState)) {
+        return false;
+      }
+
+      // change to same state is not a change, and does not notify the notify listeners
+      if (state.equals(newState)) {
+        return false;
+      }
+
+      checkState(
+          !isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
+
+      state = newState;
+
+      futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+      stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+
+      // if we are now in a terminal state, free the listeners since this will be the last
+      // notification
+      if (isTerminalState(state)) {
+        this.stateChangeListeners.clear();
+      }
     }
 
-    private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener)
-    {
-        try {
-            stateChangeListener.stateChanged(newState);
-        }
-        catch (Throwable e) {
-//            log.error(e, "Error notifying state change listener for %s", name);
-        }
-    }
+    fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    return true;
+  }
+
+  private void fireStateChanged(
+      T newState,
+      FutureStateChange<T> futureStateChange,
+      List<StateChangeListener<T>> stateChangeListeners) {
+    checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
+    requireNonNull(newState, "newState is null");
+
+    // always fire listener callbacks from a different thread
+    safeExecute(
+        () -> {
+          checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
+          try {
+            futureStateChange.complete(newState);
+          } catch (Throwable e) {
+            //                log.error(e, "Error setting future state for %s", name);
+          }
+          for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
+            fireStateChangedListener(newState, stateChangeListener);
+          }
+        });
+  }
 
-    /**
-     * Gets a future that completes when the state is no longer {@code .equals()} to {@code currentState)}.
-     */
-    public ListenableFuture<T> getStateChange(T currentState)
-    {
-        checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
-        requireNonNull(currentState, "currentState is null");
-
-        synchronized (lock) {
-            // return a completed future if the state has already changed, or we are in a terminal state
-            if (!state.equals(currentState) || isTerminalState(state)) {
-                return immediateFuture(state);
-            }
-
-            return futureStateChange.get().createNewListener();
-        }
+  private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener) {
+    try {
+      stateChangeListener.stateChanged(newState);
+    } catch (Throwable e) {
+      //            log.error(e, "Error notifying state change listener for %s", name);
     }
-
-    /**
-     * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
-     * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
-     * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
-     * possible notifications are observed out of order due to the asynchronous execution. The listener is
-     * immediately notified immediately of the current state.
-     */
-    public void addStateChangeListener(StateChangeListener<T> stateChangeListener)
-    {
-        requireNonNull(stateChangeListener, "stateChangeListener is null");
-
-        boolean inTerminalState;
-        T currentState;
-        synchronized (lock) {
-            currentState = state;
-            inTerminalState = isTerminalState(currentState);
-            if (!inTerminalState) {
-                stateChangeListeners.add(stateChangeListener);
-            }
-        }
-
-        // fire state change listener with the current state
-        // always fire listener callbacks from a different thread
-        safeExecute(() -> stateChangeListener.stateChanged(currentState));
+  }
+
+  /**
+   * Gets a future that completes when the state is no longer {@code .equals()} to {@code
+   * currentState)}.
+   */
+  public ListenableFuture<T> getStateChange(T currentState) {
+    checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
+    requireNonNull(currentState, "currentState is null");
+
+    synchronized (lock) {
+      // return a completed future if the state has already changed, or we are in a terminal state
+      if (!state.equals(currentState) || isTerminalState(state)) {
+        return immediateFuture(state);
+      }
+
+      return futureStateChange.get().createNewListener();
     }
-
-    @VisibleForTesting
-    boolean isTerminalState(T state)
-    {
-        return terminalStates.contains(state);
+  }
+
+  /**
+   * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
+   * Listener is always notified asynchronously using a dedicated notification thread pool so, care
+   * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
+   * Additionally, it is possible notifications are observed out of order due to the asynchronous
+   * execution. The listener is immediately notified immediately of the current state.
+   */
+  public void addStateChangeListener(StateChangeListener<T> stateChangeListener) {
+    requireNonNull(stateChangeListener, "stateChangeListener is null");
+
+    boolean inTerminalState;
+    T currentState;
+    synchronized (lock) {
+      currentState = state;
+      inTerminalState = isTerminalState(currentState);
+      if (!inTerminalState) {
+        stateChangeListeners.add(stateChangeListener);
+      }
     }
 
-    @VisibleForTesting
-    List<StateChangeListener<T>> getStateChangeListeners()
-    {
-        synchronized (lock) {
-            return ImmutableList.copyOf(stateChangeListeners);
-        }
-    }
+    // fire state change listener with the current state
+    // always fire listener callbacks from a different thread
+    safeExecute(() -> stateChangeListener.stateChanged(currentState));
+  }
 
-    public interface StateChangeListener<T>
-    {
-        void stateChanged(T newState);
-    }
+  @VisibleForTesting
+  boolean isTerminalState(T state) {
+    return terminalStates.contains(state);
+  }
 
-    @Override
-    public String toString()
-    {
-        return get().toString();
+  @VisibleForTesting
+  List<StateChangeListener<T>> getStateChangeListeners() {
+    synchronized (lock) {
+      return ImmutableList.copyOf(stateChangeListeners);
     }
-
-    private void safeExecute(Runnable command)
-    {
-        try {
-            executor.execute(command);
-        }
-        catch (RejectedExecutionException e) {
-            if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
-                // TODO: (xingtanzjr) handle the exception
-                throw new RuntimeException("Server is shutting down", e);
-            }
-            throw e;
-        }
+  }
+
+  public interface StateChangeListener<T> {
+    void stateChanged(T newState);
+  }
+
+  @Override
+  public String toString() {
+    return get().toString();
+  }
+
+  private void safeExecute(Runnable command) {
+    try {
+      executor.execute(command);
+    } catch (RejectedExecutionException e) {
+      if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
+        // TODO: (xingtanzjr) handle the exception
+        throw new RuntimeException("Server is shutting down", e);
+      }
+      throw e;
     }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
index 6f283b8994..de842f0207 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
+
 import org.apache.thrift.TException;
 
 import java.util.List;
@@ -33,28 +34,32 @@ import java.util.concurrent.ExecutorService;
 
 public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
 
-    protected QueryStateMachine stateMachine;
-    protected ExecutorService executor;
-    protected List<FragmentInstance> instances;
-
-    public AbstractFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
-        this.stateMachine = stateMachine;
-        this.executor = executor;
-        this.instances = instances;
-    }
+  protected QueryStateMachine stateMachine;
+  protected ExecutorService executor;
+  protected List<FragmentInstance> instances;
 
-    public abstract void start();
+  public AbstractFragInsStateFetcher(
+      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+    this.stateMachine = stateMachine;
+    this.executor = executor;
+    this.instances = instances;
+  }
 
-    protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
-        InternalService.Client client = InternalServiceClientFactory
-                .getInternalServiceClient(instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-        TFragmentInstanceStateResp resp = client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
-        return FragmentInstanceState.valueOf(resp.state);
-    }
+  public abstract void start();
 
-    private TFragmentInstanceId getTId(FragmentInstance instance) {
-        return new TFragmentInstanceId(instance.getId().getQueryId().getId(),
-                String.valueOf(instance.getId().getFragmentId().getId()), instance.getId().getInstanceId());
-    }
+  protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
+    InternalService.Client client =
+        InternalServiceClientFactory.getInternalServiceClient(
+            instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+    TFragmentInstanceStateResp resp =
+        client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+    return FragmentInstanceState.valueOf(resp.state);
+  }
 
+  private TFragmentInstanceId getTId(FragmentInstance instance) {
+    return new TFragmentInstanceId(
+        instance.getId().getQueryId().getId(),
+        String.valueOf(instance.getId().getFragmentId().getId()),
+        instance.getId().getInstanceId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index c4050a3045..8c6c2991bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import io.airlift.units.Duration;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
@@ -27,6 +26,8 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
+import io.airlift.units.Duration;
+
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -46,7 +47,8 @@ public class ClusterScheduler implements IScheduler {
   private List<FragmentInstance> instances;
   private IFragInstanceDispatcher dispatcher;
 
-  public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
+  public ClusterScheduler(
+      QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
@@ -74,11 +76,13 @@ public class ClusterScheduler implements IScheduler {
       return;
     }
 
-    // The FragmentInstances has been dispatched successfully to corresponding host, we mark the QueryState to Running
+    // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
+    // QueryState to Running
     stateMachine.transitionToRunning();
-    instances.forEach(instance -> {
-      stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
-    });
+    instances.forEach(
+        instance -> {
+          stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
+        });
 
     // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
index 583f2503f1..9c0bc9e807 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
@@ -22,40 +22,44 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
 import org.apache.thrift.TException;
 
 import java.util.List;
 import java.util.concurrent.*;
 
 public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
-    private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
-    private volatile boolean aborted = false;
+  private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
+  private volatile boolean aborted = false;
 
-    public FixedRateFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
-        super(stateMachine, executor, instances);
-    }
+  public FixedRateFragInsStateFetcher(
+      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+    super(stateMachine, executor, instances);
+  }
 
-    @Override
-    public void start() {
-        while (!aborted) {
-            try {
-                Future<Boolean> future = executor.submit(() -> {
-                    for (FragmentInstance instance : instances) {
-                        try {
-                            FragmentInstanceState state = fetchState(instance);
-                            stateMachine.updateFragInstanceState(instance.getId(), state);
-                        } catch (TException e) {
-                            // TODO: do nothing ?
-                            return false;
-                        }
+  @Override
+  public void start() {
+    while (!aborted) {
+      try {
+        Future<Boolean> future =
+            executor.submit(
+                () -> {
+                  for (FragmentInstance instance : instances) {
+                    try {
+                      FragmentInstanceState state = fetchState(instance);
+                      stateMachine.updateFragInstanceState(instance.getId(), state);
+                    } catch (TException e) {
+                      // TODO: do nothing ?
+                      return false;
                     }
-                    return true;
+                  }
+                  return true;
                 });
-                future.get();
-                Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
-            } catch (InterruptedException | ExecutionException e) {
-                // TODO:
-            }
-        }
+        future.get();
+        Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
+      } catch (InterruptedException | ExecutionException e) {
+        // TODO:
+      }
     }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
index 8108c9c0c0..fbafb7a34b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 public class FragInstanceDispatchResult {
-    private boolean successful;
+  private boolean successful;
 
-    public FragInstanceDispatchResult(boolean successful) {
-        this.successful = successful;
-    }
+  public FragInstanceDispatchResult(boolean successful) {
+    this.successful = successful;
+  }
 
-    public boolean isSuccessful() {
-        return successful;
-    }
+  public boolean isSuccessful() {
+    return successful;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
index 9fdedabb30..c9574fdb4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
@@ -25,12 +25,13 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 public interface IFragInstanceDispatcher {
-    /**
-     * Dispatch all Fragment instances asynchronously
-     * @param instances Fragment instance list
-     * @return Boolean.
-     */
-    Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+  /**
+   * Dispatch all Fragment instances asynchronously
+   *
+   * @param instances Fragment instance list
+   * @return Boolean.
+   */
+  Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
 
-    void abort();
+  void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
index 33d0d5343f..3c14ee27ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
@@ -20,5 +20,5 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 public interface IFragInstanceStateFetcher {
-    void start();
+  void start();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index d79b66ca6b..8dfc825c0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSocket;
@@ -27,11 +28,12 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 public class InternalServiceClientFactory {
-    // TODO: (xingtanzjr) consider the best practice to maintain the clients
-    public static InternalService.Client getInternalServiceClient(String endpoint, int port) throws TTransportException {
-        TTransport transport = new TSocket(endpoint, port);
-        transport.open();
-        TProtocol protocol = new TBinaryProtocol(transport);
-        return new InternalService.Client(protocol);
-    }
+  // TODO: (xingtanzjr) consider the best practice to maintain the clients
+  public static InternalService.Client getInternalServiceClient(String endpoint, int port)
+      throws TTransportException {
+    TTransport transport = new TSocket(endpoint, port);
+    transport.open();
+    TProtocol protocol = new TBinaryProtocol(transport);
+    return new InternalService.Client(protocol);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index a48fc7b7c3..f47d3df45e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+
 import org.apache.thrift.TException;
 
 import java.util.List;
@@ -29,27 +30,28 @@ import java.util.concurrent.FutureTask;
 
 public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
 
-    @Override
-    public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
-        FutureTask<FragInstanceDispatchResult> dispatchTask = new FutureTask<>(() -> {
-            try {
+  @Override
+  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
+    FutureTask<FragInstanceDispatchResult> dispatchTask =
+        new FutureTask<>(
+            () -> {
+              try {
                 for (FragmentInstance instance : instances) {
-                    InternalService.Client client = InternalServiceClientFactory.
-                            getInternalServiceClient(instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-                    client.sendFragmentInstance(null);
+                  InternalService.Client client =
+                      InternalServiceClientFactory.getInternalServiceClient(
+                          instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+                  client.sendFragmentInstance(null);
                 }
-            } catch (TException e) {
+              } catch (TException e) {
                 // TODO: (xingtanzjr) add more details
                 return new FragInstanceDispatchResult(false);
-            }
-            return new FragInstanceDispatchResult(true);
-        });
-        new Thread(dispatchTask).start();
-        return dispatchTask;
-    }
-
-    @Override
-    public void abort() {
+              }
+              return new FragInstanceDispatchResult(true);
+            });
+    new Thread(dispatchTask).start();
+    return dispatchTask;
+  }
 
-    }
+  @Override
+  public void abort() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 26562059ab..c1085e15d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.commons.lang.Validate;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -26,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.Validate;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -57,7 +57,9 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    Validate.isTrue(children == null || children.size() == 1, "Children size of FragmentSinkNode should be 0 or 1");
+    Validate.isTrue(
+        children == null || children.size() == 1,
+        "Children size of FragmentSinkNode should be 0 or 1");
     FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
     if (children != null) {
       sinkNode.setChild(children.get(0));