You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:39:44 UTC
[iotdb] 02/04: spotless
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e01ff0c86c0ca9e0582c20cb2f75e93d7c9db295
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 10:27:14 2022 +0800
spotless
---
.../iotdb/db/mpp/execution/FutureStateChange.java | 76 ++-
.../iotdb/db/mpp/execution/QueryExecution.java | 52 +-
.../apache/iotdb/db/mpp/execution/QueryState.java | 44 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 145 +++---
.../iotdb/db/mpp/execution/StateMachine.java | 527 ++++++++++-----------
.../scheduler/AbstractFragInsStateFetcher.java | 45 +-
.../mpp/execution/scheduler/ClusterScheduler.java | 16 +-
.../scheduler/FixedRateFragInsStateFetcher.java | 54 ++-
.../scheduler/FragInstanceDispatchResult.java | 14 +-
.../scheduler/IFragInstanceDispatcher.java | 15 +-
.../scheduler/IFragInstanceStateFetcher.java | 2 +-
.../scheduler/InternalServiceClientFactory.java | 16 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 38 +-
.../planner/plan/node/sink/FragmentSinkNode.java | 6 +-
14 files changed, 534 insertions(+), 516 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
index 55fc2da276..0d33be9b16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
+
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -27,52 +28,47 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
@ThreadSafe
-public class FutureStateChange<T>
-{
- // Use a separate future for each listener so canceled listeners can be removed
- @GuardedBy("listeners")
- private final Set<SettableFuture<T>> listeners = new HashSet<>();
+public class FutureStateChange<T> {
+ // Use a separate future for each listener so canceled listeners can be removed
+ @GuardedBy("listeners")
+ private final Set<SettableFuture<T>> listeners = new HashSet<>();
- public ListenableFuture<T> createNewListener()
- {
- SettableFuture<T> listener = SettableFuture.create();
- synchronized (listeners) {
- listeners.add(listener);
- }
+ public ListenableFuture<T> createNewListener() {
+ SettableFuture<T> listener = SettableFuture.create();
+ synchronized (listeners) {
+ listeners.add(listener);
+ }
- // remove the listener when the future completes
- listener.addListener(
- () -> {
- synchronized (listeners) {
- listeners.remove(listener);
- }
- },
- directExecutor());
+ // remove the listener when the future completes
+ listener.addListener(
+ () -> {
+ synchronized (listeners) {
+ listeners.remove(listener);
+ }
+ },
+ directExecutor());
- return listener;
- }
+ return listener;
+ }
- public void complete(T newState)
- {
- fireStateChange(newState, directExecutor());
- }
+ public void complete(T newState) {
+ fireStateChange(newState, directExecutor());
+ }
- public void complete(T newState, Executor executor)
- {
- fireStateChange(newState, executor);
- }
+ public void complete(T newState, Executor executor) {
+ fireStateChange(newState, executor);
+ }
- private void fireStateChange(T newState, Executor executor)
- {
- requireNonNull(executor, "executor is null");
- Set<SettableFuture<T>> futures;
- synchronized (listeners) {
- futures = ImmutableSet.copyOf(listeners);
- listeners.clear();
- }
+ private void fireStateChange(T newState, Executor executor) {
+ requireNonNull(executor, "executor is null");
+ Set<SettableFuture<T>> futures;
+ synchronized (listeners) {
+ futures = ImmutableSet.copyOf(listeners);
+ listeners.clear();
+ }
- for (SettableFuture<T> future : futures) {
- executor.execute(() -> future.set(newState));
- }
+ for (SettableFuture<T> future : futures) {
+ executor.execute(() -> future.set(newState));
}
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0023b488e6..dd48dff9ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -34,6 +33,8 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -68,12 +69,13 @@ public class QueryExecution {
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
- stateMachine.addStateChangeListener(state -> {
- if (!state.isDone()) {
- return;
- }
- this.cleanup();
- });
+ stateMachine.addStateChangeListener(
+ state -> {
+ if (!state.isDone()) {
+ return;
+ }
+ this.cleanup();
+ });
}
public void start() {
@@ -90,7 +92,8 @@ public class QueryExecution {
private void schedule() {
// TODO: (xingtanzjr) initialize the query scheduler according to configuration
- this.scheduler = new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+ this.scheduler =
+ new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
// TODO: (xingtanzjr) how to make the schedule running asynchronously
this.scheduler.start();
}
@@ -108,7 +111,8 @@ public class QueryExecution {
}
/**
- * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource releasing
+ * Do cleanup work for current QueryExecution including QuerySchedule aborting and resource
+ * releasing
*/
private void cleanup() {
if (this.scheduler != null) {
@@ -117,18 +121,13 @@ public class QueryExecution {
releaseResource();
}
- /**
- * Release the resources that current QueryExecution hold.
- */
- private void releaseResource() {
-
- }
+ /** Release the resources that current QueryExecution hold. */
+ private void releaseResource() {}
/**
* This method will be called by the request thread from client connection. This method will block
* until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
- * 3. The query has been cancelled 4. The query is timeout This method wil
- * l fetch the result from
+ * 3. The query has been cancelled 4. The query is timeout This method wil l fetch the result from
* DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
* implemented with DataStreamManager)
*/
@@ -137,22 +136,27 @@ public class QueryExecution {
}
/**
- * This method is a synchronized method.
- * For READ, it will block until all the FragmentInstances have been submitted.
- * For WRITE, it will block until all the FragmentInstances have finished.
+ * This method is a synchronized method. For READ, it will block until all the FragmentInstances
+ * have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
+ *
* @return ExecutionStatus. Contains the QueryId and the TSStatus.
*/
public ExecutionStatus getStatus() {
- // Although we monitor the state to transition to RUNNING, the future will return if any Terminated state is triggered
- ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
+ // Although we monitor the state to transition to RUNNING, the future will return if any
+ // Terminated state is triggered
+ ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
try {
QueryState state = future.get();
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
- TSStatusCode statusCode = state == QueryState.FINISHED ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
+ TSStatusCode statusCode =
+ state == QueryState.FINISHED
+ ? TSStatusCode.SUCCESS_STATUS
+ : TSStatusCode.QUERY_PROCESS_ERROR;
return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
} catch (InterruptedException | ExecutionException e) {
// TODO: (xingtanzjr) use more accurate error handling
- return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return new ExecutionStatus(
+ context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index b585ab28c8..383875a022 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -25,27 +25,25 @@ import java.util.stream.Stream;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
public enum QueryState {
- QUEUED(false),
- PLANNED(false),
- DISPATCHING(false),
- RUNNING(false),
- FINISHED(true),
- CANCELED(true),
- ABORTED(true),
- FAILED(true);
-
- private final boolean doneState;
-
- public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
- Stream.of(QueryState.values())
- .filter(QueryState::isDone)
- .collect(toImmutableSet());
-
- QueryState(boolean doneState) {
- this.doneState = doneState;
- }
-
- public boolean isDone() {
- return doneState;
- }
+ QUEUED(false),
+ PLANNED(false),
+ DISPATCHING(false),
+ RUNNING(false),
+ FINISHED(true),
+ CANCELED(true),
+ ABORTED(true),
+ FAILED(true);
+
+ private final boolean doneState;
+
+ public static final Set<QueryState> TERMINAL_INSTANCE_STATES =
+ Stream.of(QueryState.values()).filter(QueryState::isDone).collect(toImmutableSet());
+
+ QueryState(boolean doneState) {
+ this.doneState = doneState;
+ }
+
+ public boolean isDone() {
+ return doneState;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 33b9049b89..0ed499c785 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -18,13 +18,12 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -34,72 +33,76 @@ import java.util.concurrent.Executor;
* register listeners when the state changes of the QueryExecution.
*/
public class QueryStateMachine {
- private String name;
- private StateMachine<QueryState> queryState;
- private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
-
- // The executor will be used in all the state machines belonged to this query.
- private Executor stateMachineExecutor;
-
- public QueryStateMachine(QueryId queryId) {
- this.name = String.format("QueryStateMachine[%s]", queryId);
- this.stateMachineExecutor = getStateMachineExecutor();
- this.fragInstanceStateMap = new ConcurrentHashMap<>();
- this.queryState = new StateMachine<>(queryId.toString(), this.stateMachineExecutor ,QueryState.QUEUED, QueryState.TERMINAL_INSTANCE_STATES);
- }
-
- public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
- this.fragInstanceStateMap.put(id, state);
- }
-
- public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
- this.fragInstanceStateMap.put(id, state);
- }
-
- // TODO: (xingtanzjr) consider more suitable method for executor initialization
- private Executor getStateMachineExecutor() {
- return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
- }
-
- public void addStateChangeListener(StateMachine.StateChangeListener<QueryState> stateChangeListener)
- {
- queryState.addStateChangeListener(stateChangeListener);
- }
-
- public ListenableFuture<QueryState> getStateChange(QueryState currentState)
- {
- return queryState.getStateChange(currentState);
- }
-
- private String getName() {
- return name;
- }
-
- public void transitionToPlanned() {
- queryState.set(QueryState.PLANNED);
- }
-
- public void transitionToDispatching() {
- queryState.set(QueryState.DISPATCHING);
- }
-
- public void transitionToRunning() {
- queryState.set(QueryState.RUNNING);
- }
-
- public void transitionToFinished() {
- queryState.set(QueryState.FINISHED);
- }
-
- public void transitionToCanceled() {
- queryState.set(QueryState.CANCELED);
- }
-
- public void transitionToAborted() {
- queryState.set(QueryState.ABORTED);
- }
-
- public void transitionToFailed() {
- queryState.set(QueryState.FAILED);
- }
+ private String name;
+ private StateMachine<QueryState> queryState;
+ private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
+
+ // The executor will be used in all the state machines belonged to this query.
+ private Executor stateMachineExecutor;
+
+ public QueryStateMachine(QueryId queryId) {
+ this.name = String.format("QueryStateMachine[%s]", queryId);
+ this.stateMachineExecutor = getStateMachineExecutor();
+ this.fragInstanceStateMap = new ConcurrentHashMap<>();
+ this.queryState =
+ new StateMachine<>(
+ queryId.toString(),
+ this.stateMachineExecutor,
+ QueryState.QUEUED,
+ QueryState.TERMINAL_INSTANCE_STATES);
+ }
+
+ public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+ this.fragInstanceStateMap.put(id, state);
+ }
+
+ public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
+ this.fragInstanceStateMap.put(id, state);
+ }
+
+ // TODO: (xingtanzjr) consider more suitable method for executor initialization
+ private Executor getStateMachineExecutor() {
+ return IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
+ }
+
+ public void addStateChangeListener(
+ StateMachine.StateChangeListener<QueryState> stateChangeListener) {
+ queryState.addStateChangeListener(stateChangeListener);
+ }
+
+ public ListenableFuture<QueryState> getStateChange(QueryState currentState) {
+ return queryState.getStateChange(currentState);
+ }
+
+ private String getName() {
+ return name;
+ }
+
+ public void transitionToPlanned() {
+ queryState.set(QueryState.PLANNED);
+ }
+
+ public void transitionToDispatching() {
+ queryState.set(QueryState.DISPATCHING);
+ }
+
+ public void transitionToRunning() {
+ queryState.set(QueryState.RUNNING);
+ }
+
+ public void transitionToFinished() {
+ queryState.set(QueryState.FINISHED);
+ }
+
+ public void transitionToCanceled() {
+ queryState.set(QueryState.CANCELED);
+ }
+
+ public void transitionToAborted() {
+ queryState.set(QueryState.ABORTED);
+ }
+
+ public void transitionToFailed() {
+ queryState.set(QueryState.FAILED);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index 78d7badfea..39e663fda5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -37,288 +38,284 @@ import static java.util.Objects.requireNonNull;
* Simple state machine which holds a single state. Callers can register for state change events.
*/
@ThreadSafe
-public class StateMachine<T>
-{
- private final String name;
- private final Executor executor;
- private final Object lock = new Object();
- private final Set<T> terminalStates;
-
- @GuardedBy("lock")
- private volatile T state;
-
- @GuardedBy("lock")
- private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
-
- private final AtomicReference<FutureStateChange<T>> futureStateChange = new AtomicReference<>(new FutureStateChange<>());
-
- /**
- * Creates a state machine with the specified initial state and no terminal states.
- *
- * @param name name of this state machine to use in debug statements
- * @param executor executor for firing state change events; must not be a same thread executor
- * @param initialState the initial state
- */
- public StateMachine(String name, Executor executor, T initialState)
- {
- this(name, executor, initialState, ImmutableSet.of());
- }
-
- /**
- * Creates a state machine with the specified initial state and terminal states.
- *
- * @param name name of this state machine to use in debug statements
- * @param executor executor for firing state change events; must not be a same thread executor
- * @param initialState the initial state
- * @param terminalStates the terminal states
- */
- public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates)
- {
- this.name = requireNonNull(name, "name is null");
- this.executor = requireNonNull(executor, "executor is null");
- this.state = requireNonNull(initialState, "initialState is null");
- this.terminalStates = ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
- }
-
- // state changes are atomic and state is volatile, so a direct read is safe here
- @SuppressWarnings("FieldAccessNotGuarded")
- public T get()
- {
+public class StateMachine<T> {
+ private final String name;
+ private final Executor executor;
+ private final Object lock = new Object();
+ private final Set<T> terminalStates;
+
+ @GuardedBy("lock")
+ private volatile T state;
+
+ @GuardedBy("lock")
+ private final List<StateChangeListener<T>> stateChangeListeners = new ArrayList<>();
+
+ private final AtomicReference<FutureStateChange<T>> futureStateChange =
+ new AtomicReference<>(new FutureStateChange<>());
+
+ /**
+ * Creates a state machine with the specified initial state and no terminal states.
+ *
+ * @param name name of this state machine to use in debug statements
+ * @param executor executor for firing state change events; must not be a same thread executor
+ * @param initialState the initial state
+ */
+ public StateMachine(String name, Executor executor, T initialState) {
+ this(name, executor, initialState, ImmutableSet.of());
+ }
+
+ /**
+ * Creates a state machine with the specified initial state and terminal states.
+ *
+ * @param name name of this state machine to use in debug statements
+ * @param executor executor for firing state change events; must not be a same thread executor
+ * @param initialState the initial state
+ * @param terminalStates the terminal states
+ */
+ public StateMachine(String name, Executor executor, T initialState, Iterable<T> terminalStates) {
+ this.name = requireNonNull(name, "name is null");
+ this.executor = requireNonNull(executor, "executor is null");
+ this.state = requireNonNull(initialState, "initialState is null");
+ this.terminalStates =
+ ImmutableSet.copyOf(requireNonNull(terminalStates, "terminalStates is null"));
+ }
+
+ // state changes are atomic and state is volatile, so a direct read is safe here
+ @SuppressWarnings("FieldAccessNotGuarded")
+ public T get() {
+ return state;
+ }
+
+ /**
+ * Sets the state. If the new state does not {@code .equals()} the current state, listeners and
+ * waiters will be notified.
+ *
+ * @return the old state
+ * @throws IllegalStateException if state change would cause a transition from a terminal state
+ */
+ public T set(T newState) {
+ T oldState = trySet(newState);
+ checkState(
+ oldState.equals(newState) || !isTerminalState(oldState),
+ "%s cannot transition from %s to %s",
+ name,
+ state,
+ newState);
+ return oldState;
+ }
+
+ /**
+ * Tries to change the state. State will not change if the new state {@code .equals()} the current
+ * state, of if the current state is a terminal state. If the state changed, listeners and waiters
+ * will be notified.
+ *
+ * @return the state before the possible state change
+ */
+ public T trySet(T newState) {
+ checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+ requireNonNull(newState, "newState is null");
+
+ T oldState;
+ FutureStateChange<T> futureStateChange;
+ ImmutableList<StateChangeListener<T>> stateChangeListeners;
+ synchronized (lock) {
+ if (state.equals(newState) || isTerminalState(state)) {
return state;
- }
+ }
- /**
- * Sets the state.
- * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
- *
- * @return the old state
- * @throws IllegalStateException if state change would cause a transition from a terminal state
- */
- public T set(T newState)
- {
- T oldState = trySet(newState);
- checkState(oldState.equals(newState) || !isTerminalState(oldState), "%s cannot transition from %s to %s", name, state, newState);
- return oldState;
- }
+ oldState = state;
+ state = newState;
- /**
- * Tries to change the state. State will not change if the new state {@code .equals()} the current state,
- * of if the current state is a terminal state. If the state changed, listeners and waiters will be notified.
- *
- * @return the state before the possible state change
- */
- public T trySet(T newState)
- {
- checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
- requireNonNull(newState, "newState is null");
-
- T oldState;
- FutureStateChange<T> futureStateChange;
- ImmutableList<StateChangeListener<T>> stateChangeListeners;
- synchronized (lock) {
- if (state.equals(newState) || isTerminalState(state)) {
- return state;
- }
-
- oldState = state;
- state = newState;
-
- futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
- stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
-
- // if we are now in a terminal state, free the listeners since this will be the last notification
- if (isTerminalState(state)) {
- this.stateChangeListeners.clear();
- }
- }
-
- fireStateChanged(newState, futureStateChange, stateChangeListeners);
- return oldState;
- }
+ futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+ stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
- /**
- * Sets the state if the current state satisfies the specified predicate.
- * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
- *
- * @return true if the state is set
- */
- public boolean setIf(T newState, Predicate<T> predicate)
- {
- checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
- requireNonNull(newState, "newState is null");
-
- while (true) {
- // check if the current state passes the predicate
- T currentState = get();
-
- // change to same state is not a change, and does not notify the notify listeners
- if (currentState.equals(newState)) {
- return false;
- }
-
- // do not call predicate while holding the lock
- if (!predicate.test(currentState)) {
- return false;
- }
-
- // if state did not change while, checking the predicate, apply the new state
- if (compareAndSet(currentState, newState)) {
- return true;
- }
- }
+ // if we are now in a terminal state, free the listeners since this will be the last
+ // notification
+ if (isTerminalState(state)) {
+ this.stateChangeListeners.clear();
+ }
}
- /**
- * Sets the state if the current state {@code .equals()} the specified expected state.
- * If the new state does not {@code .equals()} the current state, listeners and waiters will be notified.
- *
- * @return true if the state is set
- */
- public boolean compareAndSet(T expectedState, T newState)
- {
- checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
- requireNonNull(expectedState, "expectedState is null");
- requireNonNull(newState, "newState is null");
-
- FutureStateChange<T> futureStateChange;
- ImmutableList<StateChangeListener<T>> stateChangeListeners;
- synchronized (lock) {
- if (!state.equals(expectedState)) {
- return false;
- }
-
- // change to same state is not a change, and does not notify the notify listeners
- if (state.equals(newState)) {
- return false;
- }
-
- checkState(!isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
-
- state = newState;
-
- futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
- stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
-
- // if we are now in a terminal state, free the listeners since this will be the last notification
- if (isTerminalState(state)) {
- this.stateChangeListeners.clear();
- }
- }
-
- fireStateChanged(newState, futureStateChange, stateChangeListeners);
+ fireStateChanged(newState, futureStateChange, stateChangeListeners);
+ return oldState;
+ }
+
+ /**
+ * Sets the state if the current state satisfies the specified predicate. If the new state does
+ * not {@code .equals()} the current state, listeners and waiters will be notified.
+ *
+ * @return true if the state is set
+ */
+ public boolean setIf(T newState, Predicate<T> predicate) {
+ checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+ requireNonNull(newState, "newState is null");
+
+ while (true) {
+ // check if the current state passes the predicate
+ T currentState = get();
+
+ // change to same state is not a change, and does not notify the notify listeners
+ if (currentState.equals(newState)) {
+ return false;
+ }
+
+ // do not call predicate while holding the lock
+ if (!predicate.test(currentState)) {
+ return false;
+ }
+
+ // if state did not change while, checking the predicate, apply the new state
+ if (compareAndSet(currentState, newState)) {
return true;
+ }
}
-
- private void fireStateChanged(T newState, FutureStateChange<T> futureStateChange, List<StateChangeListener<T>> stateChangeListeners)
- {
- checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
- requireNonNull(newState, "newState is null");
-
- // always fire listener callbacks from a different thread
- safeExecute(() -> {
- checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
- try {
- futureStateChange.complete(newState);
- }
- catch (Throwable e) {
-// log.error(e, "Error setting future state for %s", name);
- }
- for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
- fireStateChangedListener(newState, stateChangeListener);
- }
- });
+ }
+
+ /**
+ * Sets the state if the current state {@code .equals()} the specified expected state. If the new
+ * state does not {@code .equals()} the current state, listeners and waiters will be notified.
+ *
+ * @return true if the state is set
+ */
+ public boolean compareAndSet(T expectedState, T newState) {
+ checkState(!Thread.holdsLock(lock), "Cannot set state while holding the lock");
+ requireNonNull(expectedState, "expectedState is null");
+ requireNonNull(newState, "newState is null");
+
+ FutureStateChange<T> futureStateChange;
+ ImmutableList<StateChangeListener<T>> stateChangeListeners;
+ synchronized (lock) {
+ if (!state.equals(expectedState)) {
+ return false;
+ }
+
+ // change to same state is not a change, and does not notify the notify listeners
+ if (state.equals(newState)) {
+ return false;
+ }
+
+ checkState(
+ !isTerminalState(state), "%s cannot transition from %s to %s", name, state, newState);
+
+ state = newState;
+
+ futureStateChange = this.futureStateChange.getAndSet(new FutureStateChange<>());
+ stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+
+ // if we are now in a terminal state, free the listeners since this will be the last
+ // notification
+ if (isTerminalState(state)) {
+ this.stateChangeListeners.clear();
+ }
}
- private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener)
- {
- try {
- stateChangeListener.stateChanged(newState);
- }
- catch (Throwable e) {
-// log.error(e, "Error notifying state change listener for %s", name);
- }
- }
+ fireStateChanged(newState, futureStateChange, stateChangeListeners);
+ return true;
+ }
+
+ private void fireStateChanged(
+ T newState,
+ FutureStateChange<T> futureStateChange,
+ List<StateChangeListener<T>> stateChangeListeners) {
+ checkState(!Thread.holdsLock(lock), "Cannot fire state change event while holding the lock");
+ requireNonNull(newState, "newState is null");
+
+ // always fire listener callbacks from a different thread
+ safeExecute(
+ () -> {
+ checkState(!Thread.holdsLock(lock), "Cannot notify while holding the lock");
+ try {
+ futureStateChange.complete(newState);
+ } catch (Throwable e) {
+ // log.error(e, "Error setting future state for %s", name);
+ }
+ for (StateChangeListener<T> stateChangeListener : stateChangeListeners) {
+ fireStateChangedListener(newState, stateChangeListener);
+ }
+ });
+ }
- /**
- * Gets a future that completes when the state is no longer {@code .equals()} to {@code currentState)}.
- */
- public ListenableFuture<T> getStateChange(T currentState)
- {
- checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
- requireNonNull(currentState, "currentState is null");
-
- synchronized (lock) {
- // return a completed future if the state has already changed, or we are in a terminal state
- if (!state.equals(currentState) || isTerminalState(state)) {
- return immediateFuture(state);
- }
-
- return futureStateChange.get().createNewListener();
- }
+ private void fireStateChangedListener(T newState, StateChangeListener<T> stateChangeListener) {
+ try {
+ stateChangeListener.stateChanged(newState);
+ } catch (Throwable e) {
+ // log.error(e, "Error notifying state change listener for %s", name);
}
-
- /**
- * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
- * Listener is always notified asynchronously using a dedicated notification thread pool so, care should
- * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
- * possible notifications are observed out of order due to the asynchronous execution. The listener is
- * immediately notified immediately of the current state.
- */
- public void addStateChangeListener(StateChangeListener<T> stateChangeListener)
- {
- requireNonNull(stateChangeListener, "stateChangeListener is null");
-
- boolean inTerminalState;
- T currentState;
- synchronized (lock) {
- currentState = state;
- inTerminalState = isTerminalState(currentState);
- if (!inTerminalState) {
- stateChangeListeners.add(stateChangeListener);
- }
- }
-
- // fire state change listener with the current state
- // always fire listener callbacks from a different thread
- safeExecute(() -> stateChangeListener.stateChanged(currentState));
+ }
+
+ /**
+ * Gets a future that completes when the state is no longer {@code .equals()} to {@code
+ * currentState)}.
+ */
+ public ListenableFuture<T> getStateChange(T currentState) {
+ checkState(!Thread.holdsLock(lock), "Cannot wait for state change while holding the lock");
+ requireNonNull(currentState, "currentState is null");
+
+ synchronized (lock) {
+ // return a completed future if the state has already changed, or we are in a terminal state
+ if (!state.equals(currentState) || isTerminalState(state)) {
+ return immediateFuture(state);
+ }
+
+ return futureStateChange.get().createNewListener();
}
-
- @VisibleForTesting
- boolean isTerminalState(T state)
- {
- return terminalStates.contains(state);
+ }
+
+ /**
+ * Adds a listener to be notified when the state instance changes according to {@code .equals()}.
+ * Listener is always notified asynchronously using a dedicated notification thread pool so, care
+ * should be taken to avoid leaking {@code this} when adding a listener in a constructor.
+ * Additionally, it is possible notifications are observed out of order due to the asynchronous
+ * execution. The listener is immediately notified immediately of the current state.
+ */
+ public void addStateChangeListener(StateChangeListener<T> stateChangeListener) {
+ requireNonNull(stateChangeListener, "stateChangeListener is null");
+
+ boolean inTerminalState;
+ T currentState;
+ synchronized (lock) {
+ currentState = state;
+ inTerminalState = isTerminalState(currentState);
+ if (!inTerminalState) {
+ stateChangeListeners.add(stateChangeListener);
+ }
}
- @VisibleForTesting
- List<StateChangeListener<T>> getStateChangeListeners()
- {
- synchronized (lock) {
- return ImmutableList.copyOf(stateChangeListeners);
- }
- }
+ // fire state change listener with the current state
+ // always fire listener callbacks from a different thread
+ safeExecute(() -> stateChangeListener.stateChanged(currentState));
+ }
- public interface StateChangeListener<T>
- {
- void stateChanged(T newState);
- }
+ @VisibleForTesting
+ boolean isTerminalState(T state) {
+ return terminalStates.contains(state);
+ }
- @Override
- public String toString()
- {
- return get().toString();
+ @VisibleForTesting
+ List<StateChangeListener<T>> getStateChangeListeners() {
+ synchronized (lock) {
+ return ImmutableList.copyOf(stateChangeListeners);
}
-
- private void safeExecute(Runnable command)
- {
- try {
- executor.execute(command);
- }
- catch (RejectedExecutionException e) {
- if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
- // TODO: (xingtanzjr) handle the exception
- throw new RuntimeException("Server is shutting down", e);
- }
- throw e;
- }
+ }
+
+ public interface StateChangeListener<T> {
+ void stateChanged(T newState);
+ }
+
+ @Override
+ public String toString() {
+ return get().toString();
+ }
+
+ private void safeExecute(Runnable command) {
+ try {
+ executor.execute(command);
+ } catch (RejectedExecutionException e) {
+ if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) {
+ // TODO: (xingtanzjr) handle the exception
+ throw new RuntimeException("Server is shutting down", e);
+ }
+ throw e;
}
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
index 6f283b8994..de842f0207 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
+
import org.apache.thrift.TException;
import java.util.List;
@@ -33,28 +34,32 @@ import java.util.concurrent.ExecutorService;
public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
- protected QueryStateMachine stateMachine;
- protected ExecutorService executor;
- protected List<FragmentInstance> instances;
-
- public AbstractFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
- this.stateMachine = stateMachine;
- this.executor = executor;
- this.instances = instances;
- }
+ protected QueryStateMachine stateMachine;
+ protected ExecutorService executor;
+ protected List<FragmentInstance> instances;
- public abstract void start();
+ public AbstractFragInsStateFetcher(
+ QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+ this.stateMachine = stateMachine;
+ this.executor = executor;
+ this.instances = instances;
+ }
- protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
- InternalService.Client client = InternalServiceClientFactory
- .getInternalServiceClient(instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
- TFragmentInstanceStateResp resp = client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
- return FragmentInstanceState.valueOf(resp.state);
- }
+ public abstract void start();
- private TFragmentInstanceId getTId(FragmentInstance instance) {
- return new TFragmentInstanceId(instance.getId().getQueryId().getId(),
- String.valueOf(instance.getId().getFragmentId().getId()), instance.getId().getInstanceId());
- }
+ protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
+ InternalService.Client client =
+ InternalServiceClientFactory.getInternalServiceClient(
+ instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+ TFragmentInstanceStateResp resp =
+ client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+ return FragmentInstanceState.valueOf(resp.state);
+ }
+ private TFragmentInstanceId getTId(FragmentInstance instance) {
+ return new TFragmentInstanceId(
+ instance.getId().getQueryId().getId(),
+ String.valueOf(instance.getId().getFragmentId().getId()),
+ instance.getId().getInstanceId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index c4050a3045..8c6c2991bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import io.airlift.units.Duration;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
@@ -27,6 +26,8 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import io.airlift.units.Duration;
+
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -46,7 +47,8 @@ public class ClusterScheduler implements IScheduler {
private List<FragmentInstance> instances;
private IFragInstanceDispatcher dispatcher;
- public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
+ public ClusterScheduler(
+ QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
this.stateMachine = stateMachine;
this.instances = instances;
this.queryType = queryType;
@@ -74,11 +76,13 @@ public class ClusterScheduler implements IScheduler {
return;
}
- // The FragmentInstances has been dispatched successfully to corresponding host, we mark the QueryState to Running
+ // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
+ // QueryState to Running
stateMachine.transitionToRunning();
- instances.forEach(instance -> {
- stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
- });
+ instances.forEach(
+ instance -> {
+ stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
+ });
// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
index 583f2503f1..9c0bc9e807 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
@@ -22,40 +22,44 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
import org.apache.thrift.TException;
import java.util.List;
import java.util.concurrent.*;
public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
- private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
- private volatile boolean aborted = false;
+ private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
+ private volatile boolean aborted = false;
- public FixedRateFragInsStateFetcher(QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
- super(stateMachine, executor, instances);
- }
+ public FixedRateFragInsStateFetcher(
+ QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+ super(stateMachine, executor, instances);
+ }
- @Override
- public void start() {
- while (!aborted) {
- try {
- Future<Boolean> future = executor.submit(() -> {
- for (FragmentInstance instance : instances) {
- try {
- FragmentInstanceState state = fetchState(instance);
- stateMachine.updateFragInstanceState(instance.getId(), state);
- } catch (TException e) {
- // TODO: do nothing ?
- return false;
- }
+ @Override
+ public void start() {
+ while (!aborted) {
+ try {
+ Future<Boolean> future =
+ executor.submit(
+ () -> {
+ for (FragmentInstance instance : instances) {
+ try {
+ FragmentInstanceState state = fetchState(instance);
+ stateMachine.updateFragInstanceState(instance.getId(), state);
+ } catch (TException e) {
+ // TODO: do nothing ?
+ return false;
}
- return true;
+ }
+ return true;
});
- future.get();
- Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
- } catch (InterruptedException | ExecutionException e) {
- // TODO:
- }
- }
+ future.get();
+ Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO:
+ }
}
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
index 8108c9c0c0..fbafb7a34b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FragInstanceDispatchResult.java
@@ -20,13 +20,13 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
public class FragInstanceDispatchResult {
- private boolean successful;
+ private boolean successful;
- public FragInstanceDispatchResult(boolean successful) {
- this.successful = successful;
- }
+ public FragInstanceDispatchResult(boolean successful) {
+ this.successful = successful;
+ }
- public boolean isSuccessful() {
- return successful;
- }
+ public boolean isSuccessful() {
+ return successful;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
index 9fdedabb30..c9574fdb4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceDispatcher.java
@@ -25,12 +25,13 @@ import java.util.List;
import java.util.concurrent.Future;
public interface IFragInstanceDispatcher {
- /**
- * Dispatch all Fragment instances asynchronously
- * @param instances Fragment instance list
- * @return Boolean.
- */
- Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
+ /**
+ * Dispatch all Fragment instances asynchronously
+ *
+ * @param instances Fragment instance list
+ * @return Boolean.
+ */
+ Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances);
- void abort();
+ void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
index 33d0d5343f..3c14ee27ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
@@ -20,5 +20,5 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
public interface IFragInstanceStateFetcher {
- void start();
+ void start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index d79b66ca6b..8dfc825c0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
@@ -27,11 +28,12 @@ import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class InternalServiceClientFactory {
- // TODO: (xingtanzjr) consider the best practice to maintain the clients
- public static InternalService.Client getInternalServiceClient(String endpoint, int port) throws TTransportException {
- TTransport transport = new TSocket(endpoint, port);
- transport.open();
- TProtocol protocol = new TBinaryProtocol(transport);
- return new InternalService.Client(protocol);
- }
+ // TODO: (xingtanzjr) consider the best practice to maintain the clients
+ public static InternalService.Client getInternalServiceClient(String endpoint, int port)
+ throws TTransportException {
+ TTransport transport = new TSocket(endpoint, port);
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(transport);
+ return new InternalService.Client(protocol);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index a48fc7b7c3..f47d3df45e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+
import org.apache.thrift.TException;
import java.util.List;
@@ -29,27 +30,28 @@ import java.util.concurrent.FutureTask;
public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
- @Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
- FutureTask<FragInstanceDispatchResult> dispatchTask = new FutureTask<>(() -> {
- try {
+ @Override
+ public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) {
+ FutureTask<FragInstanceDispatchResult> dispatchTask =
+ new FutureTask<>(
+ () -> {
+ try {
for (FragmentInstance instance : instances) {
- InternalService.Client client = InternalServiceClientFactory.
- getInternalServiceClient(instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
- client.sendFragmentInstance(null);
+ InternalService.Client client =
+ InternalServiceClientFactory.getInternalServiceClient(
+ instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+ client.sendFragmentInstance(null);
}
- } catch (TException e) {
+ } catch (TException e) {
// TODO: (xingtanzjr) add more details
return new FragInstanceDispatchResult(false);
- }
- return new FragInstanceDispatchResult(true);
- });
- new Thread(dispatchTask).start();
- return dispatchTask;
- }
-
- @Override
- public void abort() {
+ }
+ return new FragInstanceDispatchResult(true);
+ });
+ new Thread(dispatchTask).start();
+ return dispatchTask;
+ }
- }
+ @Override
+ public void abort() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 26562059ab..c1085e15d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.commons.lang.Validate;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -26,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.Validate;
import java.nio.ByteBuffer;
import java.util.List;
@@ -57,7 +57,9 @@ public class FragmentSinkNode extends SinkNode {
@Override
public PlanNode cloneWithChildren(List<PlanNode> children) {
- Validate.isTrue(children == null || children.size() == 1, "Children size of FragmentSinkNode should be 0 or 1");
+ Validate.isTrue(
+ children == null || children.size() == 1,
+ "Children size of FragmentSinkNode should be 0 or 1");
FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
if (children != null) {
sinkNode.setChild(children.get(0));