You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/28 06:09:42 UTC
[iotdb] branch master updated: Add javadoc and rename some class in FragmentInstanceManager and Driver related (#5701)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29944af738 Add javadoc and rename some class in FragmentInstanceManager and Driver related (#5701)
29944af738 is described below
commit 29944af7386d568cc5f4056a9fa09720b612d1d6
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Thu Apr 28 14:09:36 2022 +0800
Add javadoc and rename some class in FragmentInstanceManager and Driver related (#5701)
---
.../apache/iotdb/db/mpp/execution/DataDriver.java | 4 +-
.../org/apache/iotdb/db/mpp/execution/Driver.java | 30 ----
.../mpp/execution/FragmentInstanceExecution.java | 16 +-
.../db/mpp/execution/FragmentInstanceManager.java | 6 +-
.../org/apache/iotdb/db/mpp/execution/IDriver.java | 31 ++++
...ractExecutor.java => AbstractDriverThread.java} | 19 ++-
...InstanceScheduler.java => DriverScheduler.java} | 126 ++++++++-------
...anceTaskExecutor.java => DriverTaskThread.java} | 12 +-
...l.java => DriverTaskTimeoutSentinelThread.java} | 12 +-
.../iotdb/db/mpp/schedule/ExecutionContext.java | 4 +-
...nstanceScheduler.java => IDriverScheduler.java} | 7 +-
.../iotdb/db/mpp/schedule/ITaskScheduler.java | 35 ++---
.../{FragmentInstanceTask.java => DriverTask.java} | 40 +++--
...agmentInstanceTaskID.java => DriverTaskID.java} | 8 +-
...stanceTaskStatus.java => DriverTaskStatus.java} | 4 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
.../iotdb/db/mpp/execution/DataDriverTest.java | 2 +-
.../db/mpp/schedule/DefaultTaskSchedulerTest.java | 169 ++++++++++-----------
...SchedulerTest.java => DriverSchedulerTest.java} | 52 +++----
...va => DriverTaskTimeoutSentinelThreadTest.java} | 115 ++++++--------
21 files changed, 327 insertions(+), 373 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index e6e8e8083a..899d9d2e0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -99,7 +99,7 @@ public class DataDriver extends Driver {
List<DataSourceOperator> sourceOperators =
((DataDriverContext) driverContext).getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
- QueryDataSource dataSource = initQueryDataSourceCache();
+ QueryDataSource dataSource = initQueryDataSource();
sourceOperators.forEach(
sourceOperator -> {
// construct QueryDataSource for source operator
@@ -119,7 +119,7 @@ public class DataDriver extends Driver {
* The method is called in mergeLock() when executing query. This method will get all the
* QueryDataSource needed for this query
*/
- public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
+ private QueryDataSource initQueryDataSource() throws QueryProcessException {
DataDriverContext context = (DataDriverContext) driverContext;
DataRegion dataRegion = context.getDataRegion();
dataRegion.readLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index 40da49ff15..ae220e4911 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -47,10 +47,6 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Boolean.TRUE;
import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
-/**
- * Driver encapsulates some methods which are necessary for execution scheduler to run a fragment
- * instance
- */
public abstract class Driver implements IDriver {
protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
@@ -83,11 +79,6 @@ public abstract class Driver implements IDriver {
driverBlockedFuture.set(future);
}
- /**
- * Used to judge whether this fragment instance should be scheduled for execution anymore
- *
- * @return true if the FragmentInstance is done or terminated due to failure, otherwise false.
- */
@Override
public boolean isFinished() {
checkLockNotHeld("Cannot check finished status while holding the driver lock");
@@ -107,16 +98,6 @@ public abstract class Driver implements IDriver {
/** release resource this driver used */
protected abstract void releaseResource();
- /**
- * run the fragment instance for {@param duration} time slice, the time of this run is likely not
- * to be equal to {@param duration}, the actual run time should be calculated by the caller
- *
- * @param duration how long should this fragment instance run
- * @return the returned ListenableFuture<Void> is used to represent status of this processing if
- * isDone() return true, meaning that this fragment instance is not blocked and is ready for
- * next processing otherwise, meaning that this fragment instance is blocked and not ready for
- * next processing.
- */
@Override
public ListenableFuture<Void> processFor(Duration duration) {
@@ -154,17 +135,11 @@ public abstract class Driver implements IDriver {
return result.orElse(NOT_BLOCKED);
}
- /**
- * the id information about this Fragment Instance.
- *
- * @return a {@link FragmentInstanceId} instance.
- */
@Override
public FragmentInstanceId getInfo() {
return driverContext.getId();
}
- /** clear resource used by this fragment instance */
@Override
public void close() {
// mark the service for destruction
@@ -178,11 +153,6 @@ public abstract class Driver implements IDriver {
tryWithLockUnInterruptibly(() -> TRUE);
}
- /**
- * fail current driver
- *
- * @param t reason cause this failure
- */
@Override
public void failed(Throwable t) {
driverContext.failed(t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 15c85c4d99..be2fcc6d68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.IDriverScheduler;
import com.google.common.collect.ImmutableList;
import io.airlift.stats.CounterStat;
@@ -30,8 +30,6 @@ import static org.apache.iotdb.db.mpp.execution.FragmentInstanceState.FAILED;
public class FragmentInstanceExecution {
- private final IFragmentInstanceScheduler scheduler;
-
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
@@ -44,31 +42,29 @@ public class FragmentInstanceExecution {
private long lastHeartbeat;
public static FragmentInstanceExecution createFragmentInstanceExecution(
- IFragmentInstanceScheduler scheduler,
+ IDriverScheduler scheduler,
FragmentInstanceId instanceId,
FragmentInstanceContext context,
IDriver driver,
FragmentInstanceStateMachine stateMachine,
CounterStat failedInstances) {
FragmentInstanceExecution execution =
- new FragmentInstanceExecution(scheduler, instanceId, context, driver, stateMachine);
- execution.initialize(failedInstances);
+ new FragmentInstanceExecution(instanceId, context, driver, stateMachine);
+ execution.initialize(failedInstances, scheduler);
+ scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver));
return execution;
}
private FragmentInstanceExecution(
- IFragmentInstanceScheduler scheduler,
FragmentInstanceId instanceId,
FragmentInstanceContext context,
IDriver driver,
FragmentInstanceStateMachine stateMachine) {
- this.scheduler = scheduler;
this.instanceId = instanceId;
this.context = context;
this.driver = driver;
this.sinkHandle = driver.getSinkHandle();
this.stateMachine = stateMachine;
- scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
}
public void recordHeartbeat() {
@@ -101,7 +97,7 @@ public class FragmentInstanceExecution {
}
// this is a separate method to ensure that the `this` reference is not leaked during construction
- private void initialize(CounterStat failedInstances) {
+ private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
requireNonNull(failedInstances, "failedInstances is null");
stateMachine.addStateChangeListener(
newState -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 438e1bea0e..a3abfe65e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
-import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.DriverScheduler;
+import org.apache.iotdb.db.mpp.schedule.IDriverScheduler;
import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -49,7 +49,7 @@ public class FragmentInstanceManager {
private final Map<FragmentInstanceId, FragmentInstanceContext> instanceContext;
private final Map<FragmentInstanceId, FragmentInstanceExecution> instanceExecution;
private final LocalExecutionPlanner planner = LocalExecutionPlanner.getInstance();
- private final IFragmentInstanceScheduler scheduler = FragmentInstanceScheduler.getInstance();
+ private final IDriverScheduler scheduler = DriverScheduler.getInstance();
private final ScheduledExecutorService instanceManagementExecutor;
private final ExecutorService instanceNotificationExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
index 54c31901a2..61e96dbe42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IDriver.java
@@ -24,17 +24,48 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
+/**
+ * IDriver encapsulates some methods which are necessary for FragmentInstanceTaskExecutor to run a
+ * fragment instance
+ */
public interface IDriver {
+ /**
+ * Used to judge whether this IDriver should be scheduled for execution anymore
+ *
+ * @return true if the IDriver is done or terminated due to failure, otherwise false.
+ */
boolean isFinished();
+ /**
+ * run the IDriver for {@param duration} time slice, the time of this run is likely not to be
+ * equal to {@param duration}, the actual run time should be calculated by the caller
+ *
+ * @param duration how long should this IDriver run
+ * @return the returned ListenableFuture<Void> is used to represent status of this processing if
+ * isDone() return true, meaning that this IDriver is not blocked and is ready for next
+ * processing. Otherwise, meaning that this IDriver is blocked and not ready for next
+ * processing.
+ */
ListenableFuture<Void> processFor(Duration duration);
+ /**
+ * the id information about this IDriver.
+ *
+ * @return a {@link FragmentInstanceId} instance.
+ */
FragmentInstanceId getInfo();
+ /** clear resource used by this fragment instance */
void close();
+ /**
+ * fail current driver
+ *
+ * @param t reason cause this failure
+ */
void failed(Throwable t);
+ /** @return get SinkHandle of current IDriver */
ISinkHandle getSinkHandle();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractDriverThread.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractDriverThread.java
index d3df9bbb1e..c79304b0aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractDriverThread.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.schedule;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,18 +28,18 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-/** an abstract executor for {@link FragmentInstanceTask} */
-public abstract class AbstractExecutor extends Thread implements Closeable {
+/** an abstract executor for {@link DriverTask} */
+public abstract class AbstractDriverThread extends Thread implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(AbstractExecutor.class);
- private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+ private static final Logger logger = LoggerFactory.getLogger(AbstractDriverThread.class);
+ private final IndexedBlockingQueue<DriverTask> queue;
protected final ITaskScheduler scheduler;
private volatile boolean closed;
- public AbstractExecutor(
+ public AbstractDriverThread(
String workerId,
ThreadGroup tg,
- IndexedBlockingQueue<FragmentInstanceTask> queue,
+ IndexedBlockingQueue<DriverTask> queue,
ITaskScheduler scheduler) {
super(tg, workerId);
this.queue = queue;
@@ -51,7 +51,7 @@ public abstract class AbstractExecutor extends Thread implements Closeable {
public void run() {
while (!closed && !Thread.currentThread().isInterrupted()) {
try {
- FragmentInstanceTask next = queue.poll();
+ DriverTask next = queue.poll();
execute(next);
} catch (InterruptedException e) {
break;
@@ -62,8 +62,7 @@ public abstract class AbstractExecutor extends Thread implements Closeable {
}
/** Processing a task. */
- protected abstract void execute(FragmentInstanceTask task)
- throws InterruptedException, ExecutionException;
+ protected abstract void execute(DriverTask task) throws InterruptedException, ExecutionException;
@Override
public void close() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverScheduler.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverScheduler.java
index 880041924a..fadf51400d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverScheduler.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskID;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.slf4j.Logger;
@@ -49,18 +49,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** the manager of fragment instances scheduling */
-public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IService {
+public class DriverScheduler implements IDriverScheduler, IService {
- private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceScheduler.class);
+ private static final Logger logger = LoggerFactory.getLogger(DriverScheduler.class);
- public static FragmentInstanceScheduler getInstance() {
+ public static DriverScheduler getInstance() {
return InstanceHolder.instance;
}
- private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
- private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
- private final Set<FragmentInstanceTask> blockedTasks;
- private final Map<QueryId, Set<FragmentInstanceTask>> queryMap;
+ private final IndexedBlockingQueue<DriverTask> readyQueue;
+ private final IndexedBlockingQueue<DriverTask> timeoutQueue;
+ private final Set<DriverTask> blockedTasks;
+ private final Map<QueryId, Set<DriverTask>> queryMap;
private final ITaskScheduler scheduler;
private IDataBlockManager blockManager; // TODO: init with real IDataBlockManager
@@ -68,17 +68,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
private final ThreadGroup workerGroups;
- private final List<AbstractExecutor> threads;
+ private final List<AbstractDriverThread> threads;
- private FragmentInstanceScheduler() {
+ private DriverScheduler() {
this.readyQueue =
new L2PriorityQueue<>(
- MAX_CAPACITY,
- new FragmentInstanceTask.SchedulePriorityComparator(),
- new FragmentInstanceTask());
+ MAX_CAPACITY, new DriverTask.SchedulePriorityComparator(), new DriverTask());
this.timeoutQueue =
- new L1PriorityQueue<>(
- MAX_CAPACITY, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+ new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
this.queryMap = new ConcurrentHashMap<>();
this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
this.scheduler = new Scheduler();
@@ -90,14 +87,13 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
@Override
public void start() throws StartupException {
for (int i = 0; i < WORKER_THREAD_NUM; i++) {
- AbstractExecutor t =
- new FragmentInstanceTaskExecutor(
- "Worker-Thread-" + i, workerGroups, readyQueue, scheduler);
+ AbstractDriverThread t =
+ new DriverTaskThread("Worker-Thread-" + i, workerGroups, readyQueue, scheduler);
threads.add(t);
t.start();
}
- AbstractExecutor t =
- new FragmentInstanceTimeoutSentinel(
+ AbstractDriverThread t =
+ new DriverTaskTimeoutSentinelThread(
"Sentinel-Thread", workerGroups, timeoutQueue, scheduler);
threads.add(t);
t.start();
@@ -121,20 +117,18 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@Override
- public void submitFragmentInstances(QueryId queryId, List<IDriver> instances) {
- List<FragmentInstanceTask> tasks =
+ public void submitDrivers(QueryId queryId, List<IDriver> instances) {
+ List<DriverTask> tasks =
instances.stream()
- .map(
- v ->
- new FragmentInstanceTask(v, QUERY_TIMEOUT_MS, FragmentInstanceTaskStatus.READY))
+ .map(v -> new DriverTask(v, QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
.collect(Collectors.toList());
queryMap
.computeIfAbsent(queryId, v -> Collections.synchronizedSet(new HashSet<>()))
.addAll(tasks);
- for (FragmentInstanceTask task : tasks) {
+ for (DriverTask task : tasks) {
task.lock();
try {
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ if (task.getStatus() != DriverTaskStatus.READY) {
continue;
}
timeoutQueue.push(task);
@@ -147,13 +141,13 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
@Override
public void abortQuery(QueryId queryId) {
- Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
+ Set<DriverTask> queryRelatedTasks = queryMap.remove(queryId);
if (queryRelatedTasks != null) {
- for (FragmentInstanceTask task : queryRelatedTasks) {
+ for (DriverTask task : queryRelatedTasks) {
task.lock();
try {
task.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
- clearFragmentInstanceTask(task);
+ clearDriverTask(task);
} finally {
task.unlock();
}
@@ -163,14 +157,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
@Override
public void abortFragmentInstance(FragmentInstanceId instanceId) {
- FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
+ DriverTask task = timeoutQueue.get(new DriverTaskID(instanceId));
if (task == null) {
return;
}
task.lock();
try {
task.setAbortCause(FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED);
- clearFragmentInstanceTask(task);
+ clearDriverTask(task);
} finally {
task.unlock();
}
@@ -178,7 +172,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
@Override
public double getSchedulePriority(FragmentInstanceId instanceId) {
- FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
+ DriverTask task = timeoutQueue.get(new DriverTaskID(instanceId));
if (task == null) {
throw new IllegalStateException(
"the fragmentInstance " + instanceId.getFullId() + " has been cleared");
@@ -186,9 +180,9 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
return task.getSchedulePriority();
}
- private void clearFragmentInstanceTask(FragmentInstanceTask task) {
- if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
- task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+ private void clearDriverTask(DriverTask task) {
+ if (task.getStatus() != DriverTaskStatus.FINISHED) {
+ task.setStatus(DriverTaskStatus.ABORTED);
}
if (task.getAbortCause() != null) {
task.getFragmentInstance()
@@ -196,7 +190,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
new FragmentInstanceAbortedException(
task.getFragmentInstance().getInfo(), task.getAbortCause()));
}
- if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) {
+ if (task.getStatus() == DriverTaskStatus.ABORTED) {
blockManager.forceDeregisterFragmentInstance(
new TFragmentInstanceId(
task.getId().getQueryId().getId(),
@@ -206,7 +200,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
readyQueue.remove(task.getId());
timeoutQueue.remove(task.getId());
blockedTasks.remove(task);
- Set<FragmentInstanceTask> tasks = queryMap.get(task.getId().getQueryId());
+ Set<DriverTask> tasks = queryMap.get(task.getId().getQueryId());
if (tasks != null) {
tasks.remove(task);
if (tasks.isEmpty()) {
@@ -220,22 +214,22 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@TestOnly
- IndexedBlockingQueue<FragmentInstanceTask> getReadyQueue() {
+ IndexedBlockingQueue<DriverTask> getReadyQueue() {
return readyQueue;
}
@TestOnly
- IndexedBlockingQueue<FragmentInstanceTask> getTimeoutQueue() {
+ IndexedBlockingQueue<DriverTask> getTimeoutQueue() {
return timeoutQueue;
}
@TestOnly
- Set<FragmentInstanceTask> getBlockedTasks() {
+ Set<DriverTask> getBlockedTasks() {
return blockedTasks;
}
@TestOnly
- Map<QueryId, Set<FragmentInstanceTask>> getQueryMap() {
+ Map<QueryId, Set<DriverTask>> getQueryMap() {
return queryMap;
}
@@ -248,18 +242,18 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
private InstanceHolder() {}
- private static final FragmentInstanceScheduler instance = new FragmentInstanceScheduler();
+ private static final DriverScheduler instance = new DriverScheduler();
}
/** the default scheduler implementation */
private class Scheduler implements ITaskScheduler {
@Override
- public void blockedToReady(FragmentInstanceTask task) {
+ public void blockedToReady(DriverTask task) {
task.lock();
try {
- if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+ if (task.getStatus() != DriverTaskStatus.BLOCKED) {
return;
}
- task.setStatus(FragmentInstanceTaskStatus.READY);
+ task.setStatus(DriverTaskStatus.READY);
readyQueue.push(task);
blockedTasks.remove(task);
} finally {
@@ -268,13 +262,13 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@Override
- public boolean readyToRunning(FragmentInstanceTask task) {
+ public boolean readyToRunning(DriverTask task) {
task.lock();
try {
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ if (task.getStatus() != DriverTaskStatus.READY) {
return false;
}
- task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+ task.setStatus(DriverTaskStatus.RUNNING);
} finally {
task.unlock();
}
@@ -282,14 +276,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@Override
- public void runningToReady(FragmentInstanceTask task, ExecutionContext context) {
+ public void runningToReady(DriverTask task, ExecutionContext context) {
task.lock();
try {
- if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+ if (task.getStatus() != DriverTaskStatus.RUNNING) {
return;
}
task.updateSchedulePriority(context);
- task.setStatus(FragmentInstanceTaskStatus.READY);
+ task.setStatus(DriverTaskStatus.READY);
readyQueue.push(task);
} finally {
task.unlock();
@@ -297,14 +291,14 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@Override
- public void runningToBlocked(FragmentInstanceTask task, ExecutionContext context) {
+ public void runningToBlocked(DriverTask task, ExecutionContext context) {
task.lock();
try {
- if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+ if (task.getStatus() != DriverTaskStatus.RUNNING) {
return;
}
task.updateSchedulePriority(context);
- task.setStatus(FragmentInstanceTaskStatus.BLOCKED);
+ task.setStatus(DriverTaskStatus.BLOCKED);
blockedTasks.add(task);
} finally {
task.unlock();
@@ -312,22 +306,22 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
}
@Override
- public void runningToFinished(FragmentInstanceTask task, ExecutionContext context) {
+ public void runningToFinished(DriverTask task, ExecutionContext context) {
task.lock();
try {
- if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+ if (task.getStatus() != DriverTaskStatus.RUNNING) {
return;
}
task.updateSchedulePriority(context);
- task.setStatus(FragmentInstanceTaskStatus.FINISHED);
- clearFragmentInstanceTask(task);
+ task.setStatus(DriverTaskStatus.FINISHED);
+ clearDriverTask(task);
} finally {
task.unlock();
}
}
@Override
- public void toAborted(FragmentInstanceTask task) {
+ public void toAborted(DriverTask task) {
task.lock();
try {
// If a task is already in an end state, it indicates that the task is finalized in other
@@ -338,21 +332,21 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
logger.warn(
"The task {} is aborted. All other tasks in the same query will be cancelled",
task.getId().toString());
- clearFragmentInstanceTask(task);
+ clearDriverTask(task);
} finally {
task.unlock();
}
QueryId queryId = task.getId().getQueryId();
- Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
+ Set<DriverTask> queryRelatedTasks = queryMap.remove(queryId);
if (queryRelatedTasks != null) {
- for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+ for (DriverTask otherTask : queryRelatedTasks) {
if (task.equals(otherTask)) {
continue;
}
otherTask.lock();
try {
otherTask.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
- clearFragmentInstanceTask(otherTask);
+ clearDriverTask(otherTask);
} finally {
otherTask.unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskThread.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskThread.java
index 25b4e29a0a..9c227cf100 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskThread.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.schedule;
import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.stats.CpuTimer;
import com.google.common.util.concurrent.ListenableFuture;
@@ -30,24 +30,24 @@ import io.airlift.units.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends AbstractExecutor {
+/** the worker thread of {@link DriverTask} */
+public class DriverTaskThread extends AbstractDriverThread {
public static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
// As the callback is lightweight enough, there's no need to use another one thread to execute.
private static final Executor listeningExecutor = MoreExecutors.directExecutor();
- public FragmentInstanceTaskExecutor(
+ public DriverTaskThread(
String workerId,
ThreadGroup tg,
- IndexedBlockingQueue<FragmentInstanceTask> queue,
+ IndexedBlockingQueue<DriverTask> queue,
ITaskScheduler scheduler) {
super(workerId, tg, queue, scheduler);
}
@Override
- public void execute(FragmentInstanceTask task) throws InterruptedException {
+ public void execute(DriverTask task) throws InterruptedException {
// try to switch it to RUNNING
if (!scheduler.readyToRunning(task)) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThread.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThread.java
index e7aaaf4e47..d0b691f09d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThread.java
@@ -19,21 +19,21 @@
package org.apache.iotdb.db.mpp.schedule;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
-/** the thread for watching the timeout of {@link FragmentInstanceTask} */
-public class FragmentInstanceTimeoutSentinel extends AbstractExecutor {
+/** the thread for watching the timeout of {@link DriverTask} */
+public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
- public FragmentInstanceTimeoutSentinel(
+ public DriverTaskTimeoutSentinelThread(
String workerId,
ThreadGroup tg,
- IndexedBlockingQueue<FragmentInstanceTask> queue,
+ IndexedBlockingQueue<DriverTask> queue,
ITaskScheduler scheduler) {
super(workerId, tg, queue, scheduler);
}
@Override
- public void execute(FragmentInstanceTask task) throws InterruptedException {
+ public void execute(DriverTask task) throws InterruptedException {
task.lock();
try {
// if this task is already in an end state, it means that the resource releasing will be
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
index e8cd091d2d..10d9f3d597 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
@@ -18,12 +18,12 @@
*/
package org.apache.iotdb.db.mpp.schedule;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.stats.CpuTimer;
import io.airlift.units.Duration;
-/** The execution context of a {@link FragmentInstanceTask} */
+/** The execution context of a {@link DriverTask} */
public class ExecutionContext {
private CpuTimer.CpuDuration cpuDuration;
private Duration timeSlice;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IDriverScheduler.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/IDriverScheduler.java
index b8d5970ee9..30dc6aa38c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IDriverScheduler.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.execution.IDriver;
import java.util.List;
/** the interface of fragment instance scheduling */
-public interface IFragmentInstanceScheduler {
+public interface IDriverScheduler {
/**
* Submit one or more {@link org.apache.iotdb.db.mpp.execution.IDriver} in one query for later
@@ -34,7 +34,7 @@ public interface IFragmentInstanceScheduler {
* @param queryId the queryId these instances belong to.
* @param instances the submitted instances.
*/
- void submitFragmentInstances(QueryId queryId, List<IDriver> instances);
+ void submitDrivers(QueryId queryId, List<IDriver> instances);
/**
* Abort all the instances in this query.
@@ -44,7 +44,8 @@ public interface IFragmentInstanceScheduler {
void abortQuery(QueryId queryId);
/**
- * Abort the fragment instance. If the instance is not existed, nothing will happen.
+ * Abort all Drivers of the fragment instance. If the instance is not existed, nothing will
+ * happen.
*
* @param instanceId the id of the fragment instance to be aborted.
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
index f4a008b4a2..641aba8c7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
@@ -18,60 +18,55 @@
*/
package org.apache.iotdb.db.mpp.schedule;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
-/** the scheduler interface of {@link FragmentInstanceTask} */
+/** the scheduler interface of {@link DriverTask} */
public interface ITaskScheduler {
/**
- * Switch a task from {@link FragmentInstanceTaskStatus#BLOCKED} to {@link
- * FragmentInstanceTaskStatus#READY}.
+ * Switch a task from {@link DriverTaskStatus#BLOCKED} to {@link DriverTaskStatus#READY}.
*
* @param task the task to be switched.
*/
- void blockedToReady(FragmentInstanceTask task);
+ void blockedToReady(DriverTask task);
/**
- * Switch a task from {@link FragmentInstanceTaskStatus#READY} to {@link
- * FragmentInstanceTaskStatus#RUNNING}.
+ * Switch a task from {@link DriverTaskStatus#READY} to {@link DriverTaskStatus#RUNNING}.
*
* @param task the task to be switched.
* @return true if it's switched to the target status successfully, otherwise false.
*/
- boolean readyToRunning(FragmentInstanceTask task);
+ boolean readyToRunning(DriverTask task);
/**
- * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
- * FragmentInstanceTaskStatus#READY}.
+ * Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#READY}.
*
* @param task the task to be switched.
* @param context the execution context of last running.
*/
- void runningToReady(FragmentInstanceTask task, ExecutionContext context);
+ void runningToReady(DriverTask task, ExecutionContext context);
/**
- * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
- * FragmentInstanceTaskStatus#BLOCKED}.
+ * Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#BLOCKED}.
*
* @param task the task to be switched.
* @param context the execution context of last running.
*/
- void runningToBlocked(FragmentInstanceTask task, ExecutionContext context);
+ void runningToBlocked(DriverTask task, ExecutionContext context);
/**
- * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
- * FragmentInstanceTaskStatus#FINISHED}.
+ * Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#FINISHED}.
*
* @param task the task to be switched.
* @param context the execution context of last running.
*/
- void runningToFinished(FragmentInstanceTask task, ExecutionContext context);
+ void runningToFinished(DriverTask task, ExecutionContext context);
/**
- * Switch a task to {@link FragmentInstanceTaskStatus#ABORTED}.
+ * Switch a task to {@link DriverTaskStatus#ABORTED}.
*
* @param task the task to be switched.
*/
- void toAborted(FragmentInstanceTask task);
+ void toAborted(DriverTask task);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTask.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTask.java
index fc322fee28..e008f8df1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTask.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.IDriver;
+import org.apache.iotdb.db.mpp.schedule.DriverTaskThread;
import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
import org.apache.iotdb.db.mpp.schedule.queue.ID;
import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
@@ -36,14 +36,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-/**
- * the scheduling element of {@link FragmentInstanceTaskExecutor}. It wraps a single
- * FragmentInstance.
- */
-public class FragmentInstanceTask implements IDIndexedAccessible {
+/** the scheduling element of {@link DriverTaskThread}. It wraps a single Driver. */
+public class DriverTask implements IDIndexedAccessible {
- private FragmentInstanceTaskID id;
- private FragmentInstanceTaskStatus status;
+ private DriverTaskID id;
+ private DriverTaskStatus status;
private final IDriver fragmentInstance;
// the higher this field is, the higher probability it will be scheduled.
@@ -57,42 +54,41 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
private String abortCause;
/** Initialize a dummy instance for queryHolder */
- public FragmentInstanceTask() {
+ public DriverTask() {
this(new StubFragmentInstance(), 0L, null);
}
- public FragmentInstanceTask(IDriver instance, long timeoutMs, FragmentInstanceTaskStatus status) {
+ public DriverTask(IDriver instance, long timeoutMs, DriverTaskStatus status) {
this.fragmentInstance = instance;
- this.id = new FragmentInstanceTaskID(instance.getInfo());
+ this.id = new DriverTaskID(instance.getInfo());
this.setStatus(status);
this.schedulePriority = 0.0D;
this.ddl = System.currentTimeMillis() + timeoutMs;
this.lock = new ReentrantLock();
}
- public FragmentInstanceTaskID getId() {
+ public DriverTaskID getId() {
return id;
}
@Override
public void setId(ID id) {
- this.id = (FragmentInstanceTaskID) id;
+ this.id = (DriverTaskID) id;
}
- public FragmentInstanceTaskStatus getStatus() {
+ public DriverTaskStatus getStatus() {
return status;
}
public boolean isEndState() {
- return status == FragmentInstanceTaskStatus.ABORTED
- || status == FragmentInstanceTaskStatus.FINISHED;
+ return status == DriverTaskStatus.ABORTED || status == DriverTaskStatus.FINISHED;
}
public IDriver getFragmentInstance() {
return fragmentInstance;
}
- public void setStatus(FragmentInstanceTaskStatus status) {
+ public void setStatus(DriverTaskStatus status) {
this.status = status;
}
@@ -139,7 +135,7 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
@Override
public boolean equals(Object o) {
- return o instanceof FragmentInstanceTask && ((FragmentInstanceTask) o).getId().equals(id);
+ return o instanceof DriverTask && ((DriverTask) o).getId().equals(id);
}
public String getAbortCause() {
@@ -151,10 +147,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
}
/** a comparator of ddl, the less the ddl is, the low order it has. */
- public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
+ public static class TimeoutComparator implements Comparator<DriverTask> {
@Override
- public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+ public int compare(DriverTask o1, DriverTask o2) {
if (o1.getId().equals(o2.getId())) {
return 0;
}
@@ -169,10 +165,10 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
}
/** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
- public static class SchedulePriorityComparator implements Comparator<FragmentInstanceTask> {
+ public static class SchedulePriorityComparator implements Comparator<DriverTask> {
@Override
- public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+ public int compare(DriverTask o1, DriverTask o2) {
if (o1.getId().equals(o2.getId())) {
return 0;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskID.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskID.java
index a0081b177c..fe7421667b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskID.java
@@ -26,17 +26,17 @@ import org.apache.iotdb.db.mpp.schedule.queue.ID;
import org.jetbrains.annotations.NotNull;
/** the class of id of the fragment instance task */
-public class FragmentInstanceTaskID implements ID, Comparable<FragmentInstanceTaskID> {
+public class DriverTaskID implements ID, Comparable<DriverTaskID> {
private final FragmentInstanceId id;
- public FragmentInstanceTaskID(FragmentInstanceId id) {
+ public DriverTaskID(FragmentInstanceId id) {
this.id = id;
}
@Override
public boolean equals(Object o) {
- return o instanceof FragmentInstanceTaskID && ((FragmentInstanceTaskID) o).id.equals(id);
+ return o instanceof DriverTaskID && ((DriverTaskID) o).id.equals(id);
}
@Override
@@ -62,7 +62,7 @@ public class FragmentInstanceTaskID implements ID, Comparable<FragmentInstanceTa
// This is the default comparator of FragmentInstanceID
@Override
- public int compareTo(@NotNull FragmentInstanceTaskID o) {
+ public int compareTo(@NotNull DriverTaskID o) {
return String.CASE_INSENSITIVE_ORDER.compare(this.toString(), o.toString());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskStatus.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskStatus.java
index f50dc4dfdc..09dbeac9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/DriverTaskStatus.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.schedule.task;
-/** the status enum of {@link FragmentInstanceTask} */
-public enum FragmentInstanceTaskStatus {
+/** the status enum of {@link DriverTask} */
+public enum DriverTaskStatus {
/* Ready to be executed */
READY,
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index c2c835d0e8..a6b63d46e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -44,7 +44,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.DriverScheduler;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.protocol.rest.RestService;
import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
@@ -251,7 +251,7 @@ public class DataNode implements DataNodeMBean {
registerManager.register(StorageEngineV2.getInstance());
registerManager.register(DataBlockService.getInstance());
registerManager.register(InternalService.getInstance());
- registerManager.register(FragmentInstanceScheduler.getInstance());
+ registerManager.register(DriverScheduler.getInstance());
IoTDBDescriptor.getInstance()
.getConfig()
.setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index ae8b8d0b1e..d60c167a00 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.mpp.buffer.DataBlockService;
-import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.DriverScheduler;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.protocol.rest.RestService;
import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService;
@@ -147,7 +147,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(StorageEngineV2.getInstance());
registerManager.register(DataBlockService.getInstance());
registerManager.register(InternalService.getInstance());
- registerManager.register(FragmentInstanceScheduler.getInstance());
+ registerManager.register(DriverScheduler.getInstance());
IoTDBDescriptor.getInstance()
.getConfig()
.setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 65cca8e51c..5f03cd366a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -61,7 +61,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.FragmentInstanceContext.createFragmentInstanceContext;
-import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.schedule.DriverTaskThread.EXECUTION_TIME_SLICE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index a93ab988ea..213f742e69 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.IDriver;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
import org.apache.iotdb.db.utils.stats.CpuTimer;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
public class DefaultTaskSchedulerTest {
- private final FragmentInstanceScheduler manager = FragmentInstanceScheduler.getInstance();
+ private final DriverScheduler manager = DriverScheduler.getInstance();
@After
public void tearDown() {
@@ -57,17 +57,17 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- FragmentInstanceTaskStatus[] invalidStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.FINISHED,
- FragmentInstanceTaskStatus.ABORTED,
- FragmentInstanceTaskStatus.READY,
- FragmentInstanceTaskStatus.RUNNING,
+ DriverTaskStatus[] invalidStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.FINISHED,
+ DriverTaskStatus.ABORTED,
+ DriverTaskStatus.READY,
+ DriverTaskStatus.RUNNING,
};
- for (FragmentInstanceTaskStatus status : invalidStates) {
- FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
+ for (DriverTaskStatus status : invalidStates) {
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status);
manager.getBlockedTasks().add(testTask);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -80,15 +80,14 @@ public class DefaultTaskSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
clear();
}
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.BLOCKED);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
manager.getBlockedTasks().add(testTask);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
defaultScheduler.blockedToReady(testTask);
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -109,16 +108,16 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- FragmentInstanceTaskStatus[] invalidStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.FINISHED,
- FragmentInstanceTaskStatus.ABORTED,
- FragmentInstanceTaskStatus.BLOCKED,
- FragmentInstanceTaskStatus.RUNNING,
+ DriverTaskStatus[] invalidStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.FINISHED,
+ DriverTaskStatus.ABORTED,
+ DriverTaskStatus.BLOCKED,
+ DriverTaskStatus.RUNNING,
};
- for (FragmentInstanceTaskStatus status : invalidStates) {
- FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ for (DriverTaskStatus status : invalidStates) {
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -130,14 +129,13 @@ public class DefaultTaskSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
clear();
}
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
defaultScheduler.readyToRunning(testTask);
- Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -156,16 +154,16 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- FragmentInstanceTaskStatus[] invalidStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.FINISHED,
- FragmentInstanceTaskStatus.ABORTED,
- FragmentInstanceTaskStatus.BLOCKED,
- FragmentInstanceTaskStatus.READY,
+ DriverTaskStatus[] invalidStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.FINISHED,
+ DriverTaskStatus.ABORTED,
+ DriverTaskStatus.BLOCKED,
+ DriverTaskStatus.READY,
};
- for (FragmentInstanceTaskStatus status : invalidStates) {
- FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ for (DriverTaskStatus status : invalidStates) {
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -178,9 +176,8 @@ public class DefaultTaskSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
clear();
}
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -189,7 +186,7 @@ public class DefaultTaskSchedulerTest {
context.setCpuDuration(new CpuTimer.CpuDuration());
defaultScheduler.runningToReady(testTask, context);
Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
Assert.assertNotNull(manager.getReadyQueue().get(testTask.getId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -209,16 +206,16 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- FragmentInstanceTaskStatus[] invalidStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.FINISHED,
- FragmentInstanceTaskStatus.ABORTED,
- FragmentInstanceTaskStatus.BLOCKED,
- FragmentInstanceTaskStatus.READY,
+ DriverTaskStatus[] invalidStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.FINISHED,
+ DriverTaskStatus.ABORTED,
+ DriverTaskStatus.BLOCKED,
+ DriverTaskStatus.READY,
};
- for (FragmentInstanceTaskStatus status : invalidStates) {
- FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ for (DriverTaskStatus status : invalidStates) {
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -231,9 +228,8 @@ public class DefaultTaskSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
clear();
}
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -242,7 +238,7 @@ public class DefaultTaskSchedulerTest {
context.setCpuDuration(new CpuTimer.CpuDuration());
defaultScheduler.runningToBlocked(testTask, context);
Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
- Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -262,16 +258,16 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- FragmentInstanceTaskStatus[] invalidStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.FINISHED,
- FragmentInstanceTaskStatus.ABORTED,
- FragmentInstanceTaskStatus.BLOCKED,
- FragmentInstanceTaskStatus.READY,
+ DriverTaskStatus[] invalidStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.FINISHED,
+ DriverTaskStatus.ABORTED,
+ DriverTaskStatus.BLOCKED,
+ DriverTaskStatus.READY,
};
- for (FragmentInstanceTaskStatus status : invalidStates) {
- FragmentInstanceTask testTask = new FragmentInstanceTask(mockDriver, 100L, status);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ for (DriverTaskStatus status : invalidStates) {
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -284,9 +280,8 @@ public class DefaultTaskSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
clear();
}
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
manager.getQueryMap().put(queryId, taskSet);
manager.getTimeoutQueue().push(testTask);
@@ -295,7 +290,7 @@ public class DefaultTaskSchedulerTest {
context.setCpuDuration(new CpuTimer.CpuDuration());
defaultScheduler.runningToFinished(testTask, context);
Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
- Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
@@ -319,15 +314,14 @@ public class DefaultTaskSchedulerTest {
FragmentInstanceId instanceId2 =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-1");
Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
- FragmentInstanceTaskStatus[] invalidStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.FINISHED, FragmentInstanceTaskStatus.ABORTED,
+ DriverTaskStatus[] invalidStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
};
- for (FragmentInstanceTaskStatus status : invalidStates) {
- FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 100L, status);
- FragmentInstanceTask testTask2 =
- new FragmentInstanceTask(mockDriver2, 100L, FragmentInstanceTaskStatus.BLOCKED);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ for (DriverTaskStatus status : invalidStates) {
+ DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+ DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask1);
taskSet.add(testTask2);
manager.getQueryMap().put(queryId, taskSet);
@@ -337,7 +331,7 @@ public class DefaultTaskSchedulerTest {
defaultScheduler.toAborted(testTask1);
Assert.assertEquals(status, testTask1.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask2.getStatus());
+ Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
@@ -352,23 +346,20 @@ public class DefaultTaskSchedulerTest {
Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
clear();
}
- FragmentInstanceTaskStatus[] validStates =
- new FragmentInstanceTaskStatus[] {
- FragmentInstanceTaskStatus.RUNNING,
- FragmentInstanceTaskStatus.READY,
- FragmentInstanceTaskStatus.BLOCKED,
+ DriverTaskStatus[] validStates =
+ new DriverTaskStatus[] {
+ DriverTaskStatus.RUNNING, DriverTaskStatus.READY, DriverTaskStatus.BLOCKED,
};
- for (FragmentInstanceTaskStatus status : validStates) {
+ for (DriverTaskStatus status : validStates) {
Mockito.reset(mockDriver1);
Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
Mockito.reset(mockDriver2);
Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
- FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 100L, status);
+ DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
- FragmentInstanceTask testTask2 =
- new FragmentInstanceTask(mockDriver2, 100L, FragmentInstanceTaskStatus.BLOCKED);
- Set<FragmentInstanceTask> taskSet = new HashSet<>();
+ DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+ Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask1);
taskSet.add(testTask2);
manager.getQueryMap().put(queryId, taskSet);
@@ -381,8 +372,8 @@ public class DefaultTaskSchedulerTest {
Mockito.reset(mockDataBlockManager);
// An aborted fragment may cause others in the same query aborted.
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask1.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask2.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, testTask1.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
Assert.assertNull(manager.getReadyQueue().get(testTask1.getId()));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverSchedulerTest.java
similarity index 77%
rename from server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverSchedulerTest.java
index f83d1213c6..2dc325fb30 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverSchedulerTest.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.IDriver;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskID;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
import org.junit.After;
import org.junit.Assert;
@@ -36,9 +36,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-public class FragmentInstanceSchedulerTest {
+public class DriverSchedulerTest {
- private final FragmentInstanceScheduler manager = FragmentInstanceScheduler.getInstance();
+ private final DriverScheduler manager = DriverScheduler.getInstance();
@After
public void tearDown() {
@@ -62,40 +62,37 @@ public class FragmentInstanceSchedulerTest {
IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
- manager.submitFragmentInstances(queryId, instances);
+ manager.submitDrivers(queryId, instances);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertEquals(2, manager.getQueryMap().get(queryId).size());
Assert.assertEquals(2, manager.getTimeoutQueue().size());
Assert.assertEquals(2, manager.getReadyQueue().size());
- FragmentInstanceTask task1 =
- manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId1));
+ DriverTask task1 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId1));
Assert.assertNotNull(task1);
- FragmentInstanceTask task2 =
- manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId2));
+ DriverTask task2 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId2));
Assert.assertNotNull(task2);
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(task1));
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(task2));
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task1.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task1.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task2.getStatus());
// Submit another task of the same query
IDriver mockDriver3 = Mockito.mock(IDriver.class);
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
- manager.submitFragmentInstances(queryId, Collections.singletonList(mockDriver3));
+ manager.submitDrivers(queryId, Collections.singletonList(mockDriver3));
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertEquals(3, manager.getQueryMap().get(queryId).size());
Assert.assertEquals(3, manager.getTimeoutQueue().size());
Assert.assertEquals(3, manager.getReadyQueue().size());
- FragmentInstanceTask task3 =
- manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId3));
+ DriverTask task3 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId3));
Assert.assertNotNull(task3);
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(task3));
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task3.getStatus());
// Submit another task of the different query
QueryId queryId2 = new QueryId("test2");
@@ -103,18 +100,17 @@ public class FragmentInstanceSchedulerTest {
FragmentInstanceId instanceId4 = new FragmentInstanceId(fragmentId2, "inst-0");
IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getInfo()).thenReturn(instanceId4);
- manager.submitFragmentInstances(queryId2, Collections.singletonList(mockDriver4));
+ manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4));
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId2));
Assert.assertEquals(1, manager.getQueryMap().get(queryId2).size());
Assert.assertEquals(4, manager.getTimeoutQueue().size());
Assert.assertEquals(4, manager.getReadyQueue().size());
- FragmentInstanceTask task4 =
- manager.getTimeoutQueue().get(new FragmentInstanceTaskID(instanceId4));
+ DriverTask task4 = manager.getTimeoutQueue().get(new DriverTaskID(instanceId4));
Assert.assertNotNull(task4);
Assert.assertTrue(manager.getQueryMap().get(queryId2).contains(task4));
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
// Abort one FragmentInstance
Mockito.reset(mockDriver1);
@@ -127,10 +123,10 @@ public class FragmentInstanceSchedulerTest {
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertEquals(3, manager.getTimeoutQueue().size());
Assert.assertEquals(3, manager.getReadyQueue().size());
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, task1.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task2.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task3.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any());
Assert.assertEquals(
FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED, task1.getAbortCause());
@@ -151,10 +147,10 @@ public class FragmentInstanceSchedulerTest {
Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
Assert.assertEquals(1, manager.getTimeoutQueue().size());
Assert.assertEquals(1, manager.getReadyQueue().size());
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task2.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task3.getStatus());
- Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, task1.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, task2.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, task3.getStatus());
+ Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThreadTest.java
similarity index 71%
rename from server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 28365962b8..9e6e7b3293 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.IDriver;
import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTask;
+import org.apache.iotdb.db.mpp.schedule.task.DriverTaskStatus;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -36,7 +36,7 @@ import org.mockito.Mockito;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-public class FragmentInstanceTimeoutSentinelTest {
+public class DriverTaskTimeoutSentinelThreadTest {
@Test
public void testHandleInvalidStateTask() throws ExecutionException, InterruptedException {
@@ -44,52 +44,49 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
.thenAnswer(
ans -> {
- FragmentInstanceTask task = ans.getArgument(0);
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
return false;
}
- task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+ task.setStatus(DriverTaskStatus.RUNNING);
return true;
});
QueryId queryId = new QueryId("test");
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, "inst-0");
- IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
- new L1PriorityQueue<>(
- 100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
IDriver mockDriver = Mockito.mock(IDriver.class);
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- AbstractExecutor executor =
- new FragmentInstanceTaskExecutor(
- "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
// FINISHED status test
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.FINISHED);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED);
executor.execute(testTask);
- Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// ABORTED status test
- testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.ABORTED);
+ testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.ABORTED);
executor.execute(testTask);
- Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// RUNNING status test
- testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
+ testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
executor.execute(testTask);
- Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// BLOCKED status test
- testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.BLOCKED);
+ testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
executor.execute(testTask);
- Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask.getStatus());
+ Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
@@ -104,16 +101,15 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
.thenAnswer(
ans -> {
- FragmentInstanceTask task = ans.getArgument(0);
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
return false;
}
- task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+ task.setStatus(DriverTaskStatus.RUNNING);
return true;
});
- IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
- new L1PriorityQueue<>(
- 100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
// Mock the instance with a cancelled future
IDriver mockDriver = Mockito.mock(IDriver.class);
@@ -124,11 +120,9 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockDriver.processFor(Mockito.any()))
.thenReturn(Futures.immediateCancelledFuture());
- AbstractExecutor executor =
- new FragmentInstanceTaskExecutor(
- "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertEquals(
@@ -146,16 +140,15 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
.thenAnswer(
ans -> {
- FragmentInstanceTask task = ans.getArgument(0);
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
return false;
}
- task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+ task.setStatus(DriverTaskStatus.RUNNING);
return true;
});
- IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
- new L1PriorityQueue<>(
- 100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
// Mock the instance with a cancelled future
IDriver mockDriver = Mockito.mock(IDriver.class);
@@ -165,11 +158,9 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(Futures.immediateVoidFuture());
Mockito.when(mockDriver.isFinished()).thenReturn(true);
- AbstractExecutor executor =
- new FragmentInstanceTaskExecutor(
- "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
@@ -186,16 +177,15 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
.thenAnswer(
ans -> {
- FragmentInstanceTask task = ans.getArgument(0);
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
return false;
}
- task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+ task.setStatus(DriverTaskStatus.RUNNING);
return true;
});
- IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
- new L1PriorityQueue<>(
- 100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
// Mock the instance with a blocked future
ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
@@ -216,11 +206,9 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
- AbstractExecutor executor =
- new FragmentInstanceTaskExecutor(
- "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
@@ -237,16 +225,15 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockScheduler.readyToRunning(Mockito.any()))
.thenAnswer(
ans -> {
- FragmentInstanceTask task = ans.getArgument(0);
- if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+ DriverTask task = ans.getArgument(0);
+ if (task.getStatus() != DriverTaskStatus.READY) {
return false;
}
- task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+ task.setStatus(DriverTaskStatus.RUNNING);
return true;
});
- IndexedBlockingQueue<FragmentInstanceTask> taskQueue =
- new L1PriorityQueue<>(
- 100, new FragmentInstanceTask.TimeoutComparator(), new FragmentInstanceTask());
+ IndexedBlockingQueue<DriverTask> taskQueue =
+ new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask());
// Mock the instance with a ready future
ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
@@ -267,11 +254,9 @@ public class FragmentInstanceTimeoutSentinelTest {
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
- AbstractExecutor executor =
- new FragmentInstanceTaskExecutor(
- "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
- FragmentInstanceTask testTask =
- new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
+ AbstractDriverThread executor =
+ new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());