You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 03:39:46 UTC

[iotdb] 04/04: add basic scheduler

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 30508073319647b10fd29ef4a4a91017b5171758
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 11:39:33 2022 +0800

    add basic scheduler
---
 .../apache/iotdb/db/mpp/execution/Coordinator.java | 27 +++++++++++-
 .../{ExecutionStatus.java => ExecutionResult.java} |  4 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     | 28 +++++++++---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  5 ++-
 ...tcher.java => AbstractFragInsStateTracker.java} | 11 +++--
 .../mpp/execution/scheduler/ClusterScheduler.java  | 36 ++++++++++++----
 ...cher.java => FixedRateFragInsStateTracker.java} | 50 +++++++++++-----------
 ...Fetcher.java => IFragInstanceStateTracker.java} |  4 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |  4 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  6 ++-
 10 files changed, 121 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 3ce8f7ea00..4e3f8ad681 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -25,6 +26,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * The coordinator for MPP. It manages all the queries which are executed in current Node. And it
@@ -32,6 +35,13 @@ import java.util.concurrent.ConcurrentHashMap;
  * QueryExecution.
  */
 public class Coordinator {
+  private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
+  private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+  private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
 
   private static final Coordinator INSTANCE = new Coordinator();
 
@@ -39,13 +49,15 @@ public class Coordinator {
 
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
+    this.executor = getQueryExecutor();
+    this.scheduledExecutor = getScheduledExecutor();
   }
 
   private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
-    return new QueryExecution(statement, queryContext);
+    return new QueryExecution(statement, queryContext, executor, scheduledExecutor);
   }
 
-  public ExecutionStatus execute(
+  public ExecutionResult execute(
       Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
 
     QueryExecution execution =
@@ -57,6 +69,17 @@ public class Coordinator {
     return execution.getStatus();
   }
 
+  // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
+  private ExecutorService getQueryExecutor() {
+    return IoTDBThreadPoolFactory.newFixedThreadPool(
+        COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME);
+  }
+  // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
+  private ScheduledExecutorService getScheduledExecutor() {
+    return IoTDBThreadPoolFactory.newScheduledThreadPool(
+        COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
+  }
+
   public static Coordinator getInstance() {
     return INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
index 8c24ade473..4dae202408 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.db.mpp.execution;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-public class ExecutionStatus {
+public class ExecutionResult {
   public QueryId queryId;
   public TSStatus status;
 
-  public ExecutionStatus(QueryId queryId, TSStatus status) {
+  public ExecutionResult(QueryId queryId, TSStatus status) {
     this.queryId = queryId;
     this.status = status;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index dd48dff9ec..c88512771f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -39,6 +39,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * QueryExecution stores all the status of a query which is being prepared or running inside the MPP
@@ -57,15 +59,24 @@ public class QueryExecution {
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
   // The result of QueryExecution will be written to the DataBlockManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
-  public QueryExecution(Statement statement, MPPQueryContext context) {
+  public QueryExecution(
+      Statement statement,
+      MPPQueryContext context,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.context = context;
     this.planOptimizers = new ArrayList<>();
     this.analysis = analyze(statement, context);
-    this.stateMachine = new QueryStateMachine(context.getQueryId());
+    this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -93,7 +104,12 @@ public class QueryExecution {
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
     this.scheduler =
-        new ClusterScheduler(stateMachine, distributedPlan.getInstances(), context.getQueryType());
+        new ClusterScheduler(
+            stateMachine,
+            distributedPlan.getInstances(),
+            context.getQueryType(),
+            executor,
+            scheduledExecutor);
     // TODO: (xingtanzjr) how to make the schedule running asynchronously
     this.scheduler.start();
   }
@@ -141,7 +157,7 @@ public class QueryExecution {
    *
    * @return ExecutionStatus. Contains the QueryId and the TSStatus.
    */
-  public ExecutionStatus getStatus() {
+  public ExecutionResult getStatus() {
     // Although we monitor the state to transition to RUNNING, the future will return if any
     // Terminated state is triggered
     ListenableFuture<QueryState> future = stateMachine.getStateChange(QueryState.RUNNING);
@@ -152,10 +168,10 @@ public class QueryExecution {
           state == QueryState.FINISHED
               ? TSStatusCode.SUCCESS_STATUS
               : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionStatus(context.getQueryId(), RpcUtils.getStatus(statusCode));
+      return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode));
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
-      return new ExecutionStatus(
+      return new ExecutionResult(
           context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 0ed499c785..53f33e859c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 /**
  * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
@@ -40,9 +41,9 @@ public class QueryStateMachine {
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
 
-  public QueryStateMachine(QueryId queryId) {
+  public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.name = String.format("QueryStateMachine[%s]", queryId);
-    this.stateMachineExecutor = getStateMachineExecutor();
+    this.stateMachineExecutor = executor;
     this.fragInstanceStateMap = new ConcurrentHashMap<>();
     this.queryState =
         new StateMachine<>(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index de842f0207..b7908f048b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -31,22 +31,27 @@ import org.apache.thrift.TException;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
-public abstract class AbstractFragInsStateFetcher implements IFragInstanceStateFetcher {
+public abstract class AbstractFragInsStateTracker implements IFragInstanceStateTracker {
 
   protected QueryStateMachine stateMachine;
   protected ExecutorService executor;
+  protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
 
-  public AbstractFragInsStateFetcher(
-      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
+  public AbstractFragInsStateTracker(
+      QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
     this.stateMachine = stateMachine;
     this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
   }
 
   public abstract void start();
 
+  public abstract void abort();
+
   protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
     InternalService.Client client =
         InternalServiceClientFactory.getInternalServiceClient(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 8c6c2991bc..2fa2971d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -30,7 +30,9 @@ import io.airlift.units.Duration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
@@ -47,12 +49,24 @@ public class ClusterScheduler implements IScheduler {
   private List<FragmentInstance> instances;
   private IFragInstanceDispatcher dispatcher;
 
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
+  private IFragInstanceStateTracker stateTracker;
+
   public ClusterScheduler(
-      QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType) {
+      QueryStateMachine stateMachine,
+      List<FragmentInstance> instances,
+      QueryType queryType,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.dispatcher = new SimpleFragInstanceDispatcher();
+    this.stateTracker = new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
   }
 
   @Override
@@ -85,7 +99,7 @@ public class ClusterScheduler implements IScheduler {
         });
 
     // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
-
+    this.stateTracker.start();
   }
 
   private boolean waitDispatchingFinished(Future<FragInstanceDispatchResult> dispatchResultFuture) {
@@ -102,9 +116,9 @@ public class ClusterScheduler implements IScheduler {
 
   @Override
   public void abort() {
-    if (this.dispatcher != null) {
-      dispatcher.abort();
-    }
+    // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best practice ?
+    dispatcher.abort();
+    stateTracker.abort();
   }
 
   @Override
@@ -118,14 +132,18 @@ public class ClusterScheduler implements IScheduler {
   }
 
   @Override
-  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {
+  }
 
   @Override
-  public void cancelFragment(PlanFragmentId planFragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {
+  }
 
   // Send the instances to other nodes
-  private void sendFragmentInstances() {}
+  private void sendFragmentInstances() {
+  }
 
   // After sending, start to collect the states of these fragment instances
-  private void startMonitorInstances() {}
+  private void startMonitorInstances() {
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index 9c0bc9e807..cc993298d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -28,37 +28,37 @@ import org.apache.thrift.TException;
 import java.util.List;
 import java.util.concurrent.*;
 
-public class FixedRateFragInsStateFetcher extends AbstractFragInsStateFetcher {
-  private static final long STATE_FETCH_INTERVAL_IN_MS = 1000;
-  private volatile boolean aborted = false;
+public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
+  // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
+  private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
+  private ScheduledFuture<?> trackTask;
 
-  public FixedRateFragInsStateFetcher(
-      QueryStateMachine stateMachine, ExecutorService executor, List<FragmentInstance> instances) {
-    super(stateMachine, executor, instances);
+  public FixedRateFragInsStateTracker(
+      QueryStateMachine stateMachine, ExecutorService executor, ScheduledExecutorService scheduledExecutor, List<FragmentInstance> instances) {
+    super(stateMachine, executor, scheduledExecutor, instances);
   }
 
   @Override
   public void start() {
-    while (!aborted) {
+    trackTask = scheduledExecutor.scheduleAtFixedRate(this::fetchStateAndUpdate, 0, STATE_FETCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void abort() {
+    if (trackTask != null) {
+      trackTask.cancel(true);
+    }
+  }
+
+  private void fetchStateAndUpdate() {
+    for (FragmentInstance instance : instances) {
       try {
-        Future<Boolean> future =
-            executor.submit(
-                () -> {
-                  for (FragmentInstance instance : instances) {
-                    try {
-                      FragmentInstanceState state = fetchState(instance);
-                      stateMachine.updateFragInstanceState(instance.getId(), state);
-                    } catch (TException e) {
-                      // TODO: do nothing ?
-                      return false;
-                    }
-                  }
-                  return true;
-                });
-        future.get();
-        Thread.sleep(STATE_FETCH_INTERVAL_IN_MS);
-      } catch (InterruptedException | ExecutionException e) {
-        // TODO:
+        FragmentInstanceState state = fetchState(instance);
+        if (state != null) {
+          stateMachine.updateFragInstanceState(instance.getId(), state);
+        }
+      } catch (TException e) {
+        // TODO: do nothing ?
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
index 3c14ee27ab..05dbc388bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IFragInstanceStateTracker.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-public interface IFragInstanceStateFetcher {
+public interface IFragInstanceStateTracker {
   void start();
+
+  void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index e348836021..315620643e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
-import org.apache.iotdb.db.mpp.execution.ExecutionStatus;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
@@ -1660,7 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId(false);
-      ExecutionStatus result =
+      ExecutionResult result =
           coordinator.execute(
               statement,
               new QueryId(String.valueOf(queryId)),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 86527e2788..e38952fea3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -47,8 +48,9 @@ public class QueryPlannerTest {
     QueryExecution queryExecution =
         new QueryExecution(
             stmt,
-            new MPPQueryContext(
-                querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
+            new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ),
+            IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");