You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:39:46 UTC
[iotdb] 04/04: add basic scheduler
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 30508073319647b10fd29ef4a4a91017b5171758
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 11:39:33 2022 +0800
add basic scheduler
---
.../apache/iotdb/db/mpp/execution/Coordinator.java | 27 +++++++++++-
.../{ExecutionStatus.java => ExecutionResult.java} | 4 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 28 +++++++++---
.../iotdb/db/mpp/execution/QueryStateMachine.java | 5 ++-
...tcher.java => AbstractFragInsStateTracker.java} | 11 +++--
.../mpp/execution/scheduler/ClusterScheduler.java | 36 ++++++++++++----
...cher.java => FixedRateFragInsStateTracker.java} | 50 +++++++++++-----------
...Fetcher.java => IFragInstanceStateTracker.java} | 4 +-
.../db/service/thrift/impl/TSServiceImpl.java | 4 +-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 6 ++-
10 files changed, 121 insertions(+), 54 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 3ce8f7ea00..4e3f8ad681 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -25,6 +26,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
/**
* The coordinator for MPP. It manages all the queries which are executed in current Node. And it
@@ -32,6 +35,13 @@ import java.util.concurrent.ConcurrentHashMap;
* QueryExecution.
*/
public class Coordinator {
+ private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
+ private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+ private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
+ private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+
+ private ExecutorService executor;
+ private ScheduledExecutorService scheduledExecutor;
private static final Coordinator INSTANCE = new Coordinator();
@@ -39,13 +49,15 @@ public class Coordinator {
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
+ this.executor = getQueryExecutor();
+ this.scheduledExecutor = getScheduledExecutor();
}
private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
- return new QueryExecution(statement, queryContext);
+ return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
}
- public ExecutionStatus execute(
+ public ExecutionResult execute(
Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
QueryExecution execution =
@@ -57,6 +69,17 @@ public class Coordinator {
return execution.getStatus();
}
+ // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
+ private ExecutorService getQueryExecutor() {
+ return IoTDBThreadPoolFactory.newFixedThreadPool(
+ COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME);
+ }
+ // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
+ private ScheduledExecutorService getScheduledExecutor() {
+ return IoTDBThreadPoolFactory.newScheduledThreadPool(
+ COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
+ }
+
public static Coordinator getInstance() {
return INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
index 8c24ade473..4dae202408 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-public class ExecutionStatus {
+public class ExecutionResult {
public QueryId queryId;
public TSStatus status;
- public ExecutionStatus(QueryId queryId, TSStatus status) {
+ public ExecutionResult(QueryId queryId, TSStatus status) {
this.queryId = queryId;
this.status = status;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index dd48dff9ec..c88512771f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -39,6 +39,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
/**
* QueryExecution stores all the status of a query which is being prepared or running inside the MPP
@@ -57,15 +59,24 @@ public class QueryExecution {
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
+ private ExecutorService executor;
+ private ScheduledExecutorService scheduledExecutor;
+
// The result of QueryExecution will be written to the DataBlockManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
- public QueryExecution(Statement statement, MPPQueryContext context) {
+ public QueryExecution(
+ Statement statement,
+ MPPQueryContext context,
+ ExecutorService executor,
+ ScheduledExecutorService scheduledExecutor) {
+ this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
this.context = context;
this.planOptimizers = new ArrayList<>();
this.analysis = analyze(statement, context);
- this.stateMachine = new QueryStateMachine(context.getQueryId());
+ this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
@@ -93,7 +104,12 @@ public class QueryExecution {
private void schedule() {
// TODO: (xingtanzjr) initialize the query scheduler according to configuration
this.scheduler =
- new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+ new ClusterScheduler(
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ scheduledExecutor);
// TODO: (xingtanzjr) how to make the schedule running asynchronously
this.scheduler.start();
}
@@ -141,7 +157,7 @@ public class QueryExecution {
*
* @return ExecutionStatus. Contains the QueryId and the TSStatus.
*/
- public ExecutionStatus getStatus() {
+ public ExecutionResult getStatus() {
// Although we monitor the state to transition to RUNNING, the future will return if any
// Terminated state is triggered
ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
@@ -152,10 +168,10 @@ public class QueryExecution {
state == QueryState.FINISHED
? TSStatusCode.SUCCESS_STATUS
: TSStatusCode.QUERY_PROCESS_ERROR;
- return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
+ return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
} catch (InterruptedException | ExecutionException e) {
// TODO: (xingtanzjr) use more accurate error handling
- return new ExecutionStatus(
+ return new ExecutionResult(
context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 0ed499c785..53f33e859c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
/**
* State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
@@ -40,9 +41,9 @@ public class QueryStateMachine {
// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
- public QueryStateMachine(QueryId queryId) {
+ public QueryStateMachine(QueryId queryId, ExecutorService executor) {
this.name = String.format("QueryStateMachine[%s]", queryId);
- this.stateMachineExecutor = getStateMachineExecutor();
+ this.stateMachineExecutor = executor;
this.fragInstanceStateMap = new ConcurrentHashMap<>();
this.queryState =
new StateMachine<>(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index de842f0207..b7908f048b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -31,22 +31,27 @@ import org.apache.thrift.TException;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
-public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
+public abstract class AbstractFragInsStateTracker implements IFragInstanceStateTracker {
protected QueryStateMachine stateMachine;
protected ExecutorService executor;
+ protected ScheduledExecutorService scheduledExecutor;
protected List<FragmentInstance> instances;
- public AbstractFragInsStateFetcher(
- QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+ public AbstractFragInsStateTracker(
+ QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
this.stateMachine = stateMachine;
this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
this.instances = instances;
}
public abstract void start();
+ public abstract void abort();
+
protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
InternalService.Client client =
InternalServiceClientFactory.getInternalServiceClient(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 8c6c2991bc..2fa2971d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -30,7 +30,9 @@ import io.airlift.units.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
/**
* QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
@@ -47,12 +49,24 @@ public class ClusterScheduler implements IScheduler {
private List<FragmentInstance> instances;
private IFragInstanceDispatcher dispatcher;
+ private ExecutorService executor;
+ private ScheduledExecutorService scheduledExecutor;
+
+ private IFragInstanceStateTracker stateTracker;
+
public ClusterScheduler(
- QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
+ QueryStateMachine stateMachine,
+ List<FragmentInstance> instances,
+ QueryType queryType,
+ ExecutorService executor,
+ ScheduledExecutorService scheduledExecutor) {
this.stateMachine = stateMachine;
this.instances = instances;
this.queryType = queryType;
+ this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
this.dispatcher = new SimpleFragInstanceDispatcher();
+ this.stateTracker = new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
}
@Override
@@ -85,7 +99,7 @@ public class ClusterScheduler implements IScheduler {
});
// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
-
+ this.stateTracker.start();
}
private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
@@ -102,9 +116,9 @@ public class ClusterScheduler implements IScheduler {
@Override
public void abort() {
- if (this.dispatcher != null) {
- dispatcher.abort();
- }
+ // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best practice ?
+ dispatcher.abort();
+ stateTracker.abort();
}
@Override
@@ -118,14 +132,18 @@ public class ClusterScheduler implements IScheduler {
}
@Override
- public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+ public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {
+ }
@Override
- public void cancelFragment(PlanFragmentId planFragmentId) {}
+ public void cancelFragment(PlanFragmentId planFragmentId) {
+ }
// Send the instances to other nodes
- private void sendFragmentInstances() {}
+ private void sendFragmentInstances() {
+ }
// After sending, start to collect the states of these fragment instances
- private void startMonitorInstances() {}
+ private void startMonitorInstances() {
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index 9c0bc9e807..cc993298d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -28,37 +28,37 @@ import org.apache.thrift.TException;
import java.util.List;
import java.util.concurrent.*;
-public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
- private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
- private volatile boolean aborted = false;
+public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
+ // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
+ private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
+ private ScheduledFuture<?> trackTask;
- public FixedRateFragInsStateFetcher(
- QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
- super(stateMachine, executor, instances);
+ public FixedRateFragInsStateTracker(
+ QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+ super(stateMachine, executor, scheduledExecutor, instances);
}
@Override
public void start() {
- while (!aborted) {
+ trackTask = scheduledExecutor.scheduleAtFixedRate(this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void abort() {
+ if (trackTask != null) {
+ trackTask.cancel(true);
+ }
+ }
+
+ private void fetchStateAndUpdate() {
+ for (FragmentInstance instance : instances) {
try {
- Future<Boolean> future =
- executor.submit(
- () -> {
- for (FragmentInstance instance : instances) {
- try {
- FragmentInstanceState state = fetchState(instance);
- stateMachine.updateFragInstanceState(instance.getId(), state);
- } catch (TException e) {
- // TODO: do nothing ?
- return false;
- }
- }
- return true;
- });
- future.get();
- Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
- } catch (InterruptedException | ExecutionException e) {
- // TODO:
+ FragmentInstanceState state = fetchState(instance);
+ if (state != null) {
+ stateMachine.updateFragInstanceState(instance.getId(), state);
+ }
+ } catch (TException e) {
+ // TODO: do nothing ?
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
index 3c14ee27ab..05dbc388bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
-public interface IFragInstanceStateFetcher {
+public interface IFragInstanceStateTracker {
void start();
+
+ void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index e348836021..315620643e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionStatus;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
@@ -1660,7 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId(false);
- ExecutionStatus result =
+ ExecutionResult result =
coordinator.execute(
statement,
new QueryId(String.valueOf(queryId)),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 86527e2788..e38952fea3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.plan;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -47,8 +48,9 @@ public class QueryPlannerTest {
QueryExecution queryExecution =
new QueryExecution(
stmt,
- new MPPQueryContext(
- querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
+ new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ),
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
queryExecution.doLogicalPlan();
System.out.printf("SQL: %s%n%n", querySql);
System.out.println("===== Step 1: Logical Plan =====");