You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/12 08:38:26 UTC
[iotdb] branch master updated: [IOTDB-5370] Implement MultilevelPriorityQueue for query schedule
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0e5e53ad97 [IOTDB-5370] Implement MultilevelPriorityQueue for query schedule
0e5e53ad97 is described below
commit 0e5e53ad97101c00e759f14886653ce1925c043a
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Thu Jan 12 16:38:21 2023 +0800
[IOTDB-5370] Implement MultilevelPriorityQueue for query schedule
---
.../db/mpp/execution/schedule/DriverScheduler.java | 32 ++-
.../mpp/execution/schedule/DriverTaskThread.java | 8 +-
.../mpp/execution/schedule/ExecutionContext.java | 10 +
.../mpp/execution/schedule/IDriverScheduler.java | 3 +-
.../queue/multilevelqueue/DriverTaskHandle.java | 92 +++++++
.../multilevelqueue/MultilevelPriorityQueue.java | 290 +++++++++++++++++++++
.../schedule/queue/multilevelqueue/Priority.java | 73 ++++++
.../db/mpp/execution/schedule/task/DriverTask.java | 71 +++--
.../schedule/DefaultDriverSchedulerTest.java | 87 ++++---
.../DriverTaskTimeoutSentinelThreadTest.java | 18 +-
10 files changed, 610 insertions(+), 74 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 4577f16809..e9f097f7bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
-import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
@@ -47,8 +49,10 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
@@ -60,6 +64,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
private static final Logger logger = LoggerFactory.getLogger(DriverScheduler.class);
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ private static final double LEVEL_TIME_MULTIPLIER = 2;
+
public static DriverScheduler getInstance() {
return InstanceHolder.instance;
}
@@ -69,6 +75,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
private final Set<DriverTask> blockedTasks;
private final Map<QueryId, Map<FragmentInstanceId, Set<DriverTask>>> queryMap;
private final ITaskScheduler scheduler;
+
+ private final AtomicInteger nextDriverTaskHandleId = new AtomicInteger(0);
private IMPPDataExchangeManager blockManager;
private static final int MAX_CAPACITY =
@@ -82,8 +90,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
private DriverScheduler() {
this.readyQueue =
- new L2PriorityQueue<>(
- MAX_CAPACITY, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+ new MultilevelPriorityQueue(LEVEL_TIME_MULTIPLIER, MAX_CAPACITY, new DriverTask());
this.timeoutQueue =
new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
this.queryMap = new ConcurrentHashMap<>();
@@ -162,12 +169,20 @@ public class DriverScheduler implements IDriverScheduler, IService {
@Override
public void submitDrivers(QueryId queryId, List<IDriver> drivers, long timeOut) {
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ getNextDriverTaskHandleId(),
+ (MultilevelPriorityQueue) readyQueue,
+ OptionalInt.of(Integer.MAX_VALUE));
List<DriverTask> tasks =
drivers.stream()
.map(
v ->
new DriverTask(
- v, timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
+ v,
+ timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
+ DriverTaskStatus.READY,
+ driverTaskHandle))
.collect(Collectors.toList());
for (DriverTask driverTask : tasks) {
queryMap
@@ -232,13 +247,13 @@ public class DriverScheduler implements IDriverScheduler, IService {
}
@Override
- public double getSchedulePriority(DriverTaskId driverTaskID) {
+ public Priority getSchedulePriority(DriverTaskId driverTaskID) {
DriverTask task = timeoutQueue.get(driverTaskID);
if (task == null) {
throw new IllegalStateException(
"the fragmentInstance " + driverTaskID.getFullId() + " has been cleared");
}
- return task.getSchedulePriority();
+ return task.getPriority();
}
private void clearDriverTask(DriverTask task) {
@@ -289,6 +304,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
}
}
+ private int getNextDriverTaskHandleId() {
+ return nextDriverTaskHandleId.getAndIncrement();
+ }
+
ITaskScheduler getScheduler() {
return scheduler;
}
@@ -346,6 +365,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
QUERY_METRICS.recordTaskQueueTime(
BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
task.setLastEnterReadyQueueTime(System.nanoTime());
+ task.resetLevelScheduledTime();
readyQueue.push(task);
blockedTasks.remove(task);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index 3dd977a9ee..33028af21b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.db.utils.stats.CpuTimer;
+import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
@@ -44,6 +45,8 @@ public class DriverTaskThread extends AbstractDriverThread {
private static final Executor listeningExecutor =
IoTDBThreadPoolFactory.newCachedThreadPool("scheduler-notification");
+ private final Ticker ticker;
+
public DriverTaskThread(
String workerId,
ThreadGroup tg,
@@ -51,10 +54,12 @@ public class DriverTaskThread extends AbstractDriverThread {
ITaskScheduler scheduler,
ThreadProducer producer) {
super(workerId, tg, queue, scheduler, producer);
+ this.ticker = Ticker.systemTicker();
}
@Override
public void execute(DriverTask task) throws InterruptedException {
+ long startNanos = ticker.read();
// try to switch it to RUNNING
if (!scheduler.readyToRunning(task)) {
return;
@@ -63,15 +68,16 @@ public class DriverTaskThread extends AbstractDriverThread {
CpuTimer timer = new CpuTimer();
ListenableFuture<?> future = driver.processFor(EXECUTION_TIME_SLICE);
CpuTimer.CpuDuration duration = timer.elapsedTime();
- // long cost = System.nanoTime() - startTime;
// If the future is cancelled, the task is in an error and should be thrown.
if (future.isCancelled()) {
task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED);
scheduler.toAborted(task);
return;
}
+ long quantaScheduledNanos = ticker.read() - startNanos;
ExecutionContext context = new ExecutionContext();
context.setCpuDuration(duration);
+ context.setScheduledTimeInNanos(quantaScheduledNanos);
context.setTimeSlice(EXECUTION_TIME_SLICE);
if (driver.isFinished()) {
scheduler.runningToFinished(task, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
index df821dac68..69959e4016 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
@@ -26,6 +26,8 @@ import io.airlift.units.Duration;
/** The execution context of a {@link DriverTask} */
public class ExecutionContext {
private CpuTimer.CpuDuration cpuDuration;
+
+ private long scheduledTimeInNanos;
private Duration timeSlice;
public CpuTimer.CpuDuration getCpuDuration() {
@@ -36,6 +38,14 @@ public class ExecutionContext {
this.cpuDuration = cpuDuration;
}
+ public long getScheduledTimeInNanos() {
+ return scheduledTimeInNanos;
+ }
+
+ public void setScheduledTimeInNanos(long scheduledTimeInNanos) {
+ this.scheduledTimeInNanos = scheduledTimeInNanos;
+ }
+
public Duration getTimeSlice() {
return timeSlice;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
index 93dd3e314c..5729a2718c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import java.util.List;
@@ -59,5 +60,5 @@ public interface IDriverScheduler {
* @return the schedule priority.
* @throws IllegalStateException if the instance has already been cleared.
*/
- double getSchedulePriority(DriverTaskId driverTaskID);
+ Priority getSchedulePriority(DriverTaskId driverTaskID);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
new file mode 100644
index 0000000000..e259803f99
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+public class DriverTaskHandle {
+
+ private final int driverTaskHandleId;
+
+ @GuardedBy("this")
+ private long scheduledTimeInNanos;
+
+ private final MultilevelPriorityQueue driverTaskQueue;
+
+ /** It is not used for now but can be used to limit the driverNum per Task in the future. */
+ private final OptionalInt maxDriversPerTask;
+
+ private final AtomicReference<Priority> priority = new AtomicReference<>(new Priority(0, 0));
+
+ public DriverTaskHandle(
+ int driverTaskHandleId,
+ MultilevelPriorityQueue driverTaskQueue,
+ OptionalInt maxDriversPerTask) {
+ this.driverTaskHandleId = driverTaskHandleId;
+ this.driverTaskQueue = requireNonNull(driverTaskQueue, "driverTaskQueue is null");
+ this.maxDriversPerTask = requireNonNull(maxDriversPerTask, "maxDriversPerTask is null");
+ }
+
+ public synchronized Priority addScheduledTimeInNanos(long durationNanos) {
+ scheduledTimeInNanos += durationNanos;
+ Priority newPriority =
+ driverTaskQueue.updatePriority(priority.get(), durationNanos, scheduledTimeInNanos);
+
+ priority.set(newPriority);
+ return newPriority;
+ }
+
+ public synchronized Priority resetLevelScheduledTime() {
+ long levelMinScheduledTime =
+ driverTaskQueue.getLevelMinScheduledTime(priority.get().getLevel(), scheduledTimeInNanos);
+ if (priority.get().getLevelScheduledTime() < levelMinScheduledTime) {
+ Priority newPriority = new Priority(priority.get().getLevel(), levelMinScheduledTime);
+ priority.set(newPriority);
+ return newPriority;
+ }
+
+ return priority.get();
+ }
+
+ public Priority getPriority() {
+ return priority.get();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DriverTaskHandle that = (DriverTaskHandle) o;
+ return driverTaskHandleId == that.driverTaskHandleId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(driverTaskHandleId);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
new file mode 100644
index 0000000000..97ecf3927e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue;
+
+import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is inspired by Trino <a
+ * href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java">...</a>
+ */
+public class MultilevelPriorityQueue extends IndexedBlockingQueue<DriverTask> {
+ /** Scheduled time threshold of TASK in each level */
+ static final int[] LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300};
+
+ /** the upper limit one Task can contribute to its level in one scheduled time */
+ static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30);
+
+ private final PriorityQueue<DriverTask>[] levelWaitingSplits;
+
+ /**
+ * Total amount of time each LEVEL has occupied, which decides which level we will take task from.
+ */
+ private final AtomicLong[] levelScheduledTime;
+
+ /** The minimum scheduled time which current TASK in each level has. */
+ private final AtomicLong[] levelMinScheduledTime;
+
+ /**
+ * Expected schedule time of each LEVEL.
+ *
+ * <p>The proportion of level0-level4 is: levelTimeMultiplier^4 : levelTimeMultiplier^3 :
+ * levelTimeMultiplier^2 : levelTimeMultiplier : 1
+ */
+ private final double levelTimeMultiplier;
+
+ public MultilevelPriorityQueue(
+ double levelTimeMultiplier, int maxCapacity, DriverTask queryHolder) {
+ super(maxCapacity, queryHolder);
+ this.levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
+ this.levelMinScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
+ this.levelWaitingSplits = new PriorityQueue[LEVEL_THRESHOLD_SECONDS.length];
+ for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
+ levelScheduledTime[level] = new AtomicLong();
+ levelMinScheduledTime[level] = new AtomicLong(-1);
+ levelWaitingSplits[level] = new PriorityQueue<>(new DriverTask.SchedulePriorityComparator());
+ }
+ this.levelTimeMultiplier = levelTimeMultiplier;
+ }
+
+ /**
+ * During periods of time when a level has no waiting splits, it will not accumulate scheduled
+ * time and will fall behind relative to other levels.
+ *
+ * <p>This can cause temporary starvation for other levels when splits do reach the
+ * previously-empty level.
+ *
+ * <p>To prevent this we set the scheduled time for levels which were empty to the expected
+ * scheduled time.
+ */
+ @Override
+ public void pushToQueue(DriverTask task) {
+ checkArgument(task != null, "DriverTask to be pushed is null");
+
+ int level = task.getPriority().getLevel();
+ if (levelWaitingSplits[level].isEmpty()) {
+ // Accesses to levelScheduledTime are not synchronized, so we have a data race
+ // here - our level time math will be off. However, the staleness is bounded by
+ // the fact that only running splits that complete during this computation
+ // can update the level time. Therefore, this is benign.
+ long level0Time = getLevel0TargetTime();
+ long levelExpectedTime = (long) (level0Time / Math.pow(levelTimeMultiplier, level));
+ long delta = levelExpectedTime - levelScheduledTime[level].get();
+ levelScheduledTime[level].addAndGet(delta);
+ }
+ levelWaitingSplits[level].offer(task);
+ }
+
+ protected DriverTask pollFirst() {
+ DriverTask result;
+ while (true) {
+ result = chooseLevelAndTask();
+ if (result.updatePriority()) {
+ // result.updatePriority() returns true means that the Priority of DriverTaskHandle the
+ // result belongs to has changed.
+ // All the DriverTasks of one DriverTaskHandle should be in the same level.
+ // We push the result into the queue and choose another DriverTask.
+ pushToQueue(result);
+ continue;
+ }
+ int selectedLevel = result.getPriority().getLevel();
+ levelMinScheduledTime[selectedLevel].set(result.getPriority().getLevelScheduledTime());
+ return result;
+ }
+ }
+
+ /**
+ * We attempt to give each level a target amount of scheduled time, which is configurable using
+ * levelTimeMultiplier.
+ *
+ * <p>This function selects the level that has the lowest ratio of actual to the target time with
+ * the objective of minimizing deviation from the target scheduled time. From this level, we pick
+ * the DriverTask with the lowest scheduled time.
+ */
+ private DriverTask chooseLevelAndTask() {
+ long targetScheduledTime = getLevel0TargetTime();
+ double worstRatio = 1;
+ int selectedLevel = -1;
+ for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
+ if (!levelWaitingSplits[level].isEmpty()) {
+ long levelTime = levelScheduledTime[level].get();
+ double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime);
+ if (selectedLevel == -1 || ratio > worstRatio) {
+ worstRatio = ratio;
+ selectedLevel = level;
+ }
+ }
+
+ targetScheduledTime /= levelTimeMultiplier;
+ }
+
+ // selected level == -1 means that the queue is empty and this method is only called when the
+ // queue is not empty.
+ checkState(selectedLevel != -1, "selected level can not equal to -1");
+ DriverTask result = levelWaitingSplits[selectedLevel].poll();
+ checkState(result != null, "result driverTask cannot be null");
+ return result;
+ }
+
+ @Override
+ protected DriverTask remove(DriverTask driverTask) {
+ checkArgument(driverTask != null, "driverTask is null");
+ for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+ if (level.remove(driverTask)) {
+ break;
+ }
+ }
+ return driverTask;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+ if (!level.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean contains(DriverTask driverTask) {
+ for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+ if (level.contains(driverTask)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected DriverTask get(DriverTask driverTask) {
+ // We do not support get() for MultilevelPriorityQueue since it is inefficient and not
+ // necessary.
+ throw new UnsupportedOperationException(
+ "MultilevelPriorityQueue does not support access element by get.");
+ }
+
+ @Override
+ protected void clearAllElements() {
+ for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+ level.clear();
+ }
+ }
+
+ /**
+ * Get the expected scheduled time of LEVEL0 based on the maximum scheduled time of all levels
+ * after normalization.
+ *
+ * <p>For example, the levelTimeMultiplier is 2, which means the expected proportion of level0-1
+ * is 2 : 1. However, the actual proportion of levelScheduledTime of level0 and level1 is 3 : 2,
+ * in this situation the expected time of level0 will be Math.max(3, 2 * 2) = 4.
+ *
+ * @return the expected scheduled time of LEVEL0
+ */
+ private synchronized long getLevel0TargetTime() {
+ long level0TargetTime = levelScheduledTime[0].get();
+ double currentMultiplier = levelTimeMultiplier;
+
+ for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
+ currentMultiplier /= levelTimeMultiplier;
+ long levelTime = levelScheduledTime[level].get();
+ level0TargetTime = Math.max(level0TargetTime, (long) (levelTime / currentMultiplier));
+ }
+
+ return level0TargetTime;
+ }
+
+ private void addLevelTime(int level, long nanos) {
+ levelScheduledTime[level].addAndGet(nanos);
+ }
+
+ /**
+ * MultilevelPriorityQueue charges the quanta run time to the task and the level it belongs to in
+ * an effort to maintain the target thread utilization ratios between levels and to maintain
+ * fairness within a level.
+ *
+ * <p>Consider an example DriverTask where a read hung for several minutes. This is either a bug
+ * or a failing dependency. In either case we do not want to charge the task too much, and we
+ * especially do not want to charge the level too much - i.e. cause other queries in this level to
+ * starve.
+ *
+ * @return the new priority for the task
+ */
+ public Priority updatePriority(Priority oldPriority, long quantaNanos, long scheduledNanos) {
+ int oldLevel = oldPriority.getLevel();
+ int newLevel = computeLevel(scheduledNanos);
+
+ long levelContribution = Math.min(quantaNanos, LEVEL_CONTRIBUTION_CAP);
+
+ if (oldLevel == newLevel) {
+ addLevelTime(oldLevel, levelContribution);
+ return new Priority(oldLevel, oldPriority.getLevelScheduledTime() + quantaNanos);
+ }
+
+ long remainingLevelContribution = levelContribution;
+ long remainingTaskTime = quantaNanos;
+
+ // a task normally slowly accrues scheduled time in a level and then moves to the next, but
+ // if the task had a particularly long quanta, accrue time to each level as if it had run
+ // in that level up to the level limit.
+ for (int currentLevel = oldLevel; currentLevel < newLevel; currentLevel++) {
+ long timeAccruedToLevel =
+ Math.min(
+ SECONDS.toNanos(
+ LEVEL_THRESHOLD_SECONDS[currentLevel + 1]
+ - LEVEL_THRESHOLD_SECONDS[currentLevel]),
+ remainingLevelContribution);
+ addLevelTime(currentLevel, timeAccruedToLevel);
+ remainingLevelContribution -= timeAccruedToLevel;
+ remainingTaskTime -= timeAccruedToLevel;
+ }
+
+ addLevelTime(newLevel, remainingLevelContribution);
+ // TODO figure out why add newLevelMinScheduledTime
+ long newLevelMinScheduledTime = getLevelMinScheduledTime(newLevel, scheduledNanos);
+ return new Priority(newLevel, newLevelMinScheduledTime + remainingTaskTime);
+ }
+
+ public long getLevelMinScheduledTime(int level, long taskThreadUsageNanos) {
+ levelMinScheduledTime[level].compareAndSet(-1, taskThreadUsageNanos);
+ return levelMinScheduledTime[level].get();
+ }
+
+ public static int computeLevel(long threadUsageNanos) {
+ long seconds = NANOSECONDS.toSeconds(threadUsageNanos);
+ for (int level = 0; level < (LEVEL_THRESHOLD_SECONDS.length - 1); level++) {
+ if (seconds < LEVEL_THRESHOLD_SECONDS[level + 1]) {
+ return level;
+ }
+ }
+
+ return LEVEL_THRESHOLD_SECONDS.length - 1;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/Priority.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/Priority.java
new file mode 100644
index 0000000000..734385784b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/Priority.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue;
+
+import javax.annotation.concurrent.Immutable;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * This class is inspired by Trino <a
+ * href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/executor/Priority.java">...</a>
+ * Priority is composed of a level and a within-level priority. Level decides which queue the
+ * DriverTask is placed in, while within-level priority decides which DriverTask is executed next in
+ * that level.
+ *
+ * <p>Tasks move from a lower to higher level as they exceed level thresholds of total scheduled
+ * time accrued to a task.
+ *
+ * <p>The priority within a level increases with the scheduled time accumulated in that level. This
+ * is necessary to achieve fairness when tasks acquire scheduled time at varying rates.
+ *
+ * <p>However, this priority is <b>not</b> equal to the task total accrued scheduled time. When a
+ * task graduates to a higher level, the level priority is set to the minimum current priority in
+ * the new level. This allows us to maintain instantaneous fairness in terms of scheduled time.
+ */
+@Immutable
+public final class Priority {
+ private final int level;
+
+ /**
+ * Occupied time in particular level of this task. The higher this value is, the later the task
+ * with this Priority will be polled out by a PriorityQueue.
+ */
+ private final long levelScheduledTime;
+
+ public Priority(int level, long levelScheduledTime) {
+ this.level = level;
+ this.levelScheduledTime = levelScheduledTime;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public long getLevelScheduledTime() {
+ return levelScheduledTime;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("level", level)
+ .add("levelScheduledTime", levelScheduledTime)
+ .toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index ef173a45be..e008b84210 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
import org.apache.iotdb.db.mpp.execution.schedule.ExecutionContext;
import org.apache.iotdb.db.mpp.execution.schedule.queue.ID;
import org.apache.iotdb.db.mpp.execution.schedule.queue.IDIndexedAccessible;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import java.util.Comparator;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -42,8 +44,6 @@ public class DriverTask implements IDIndexedAccessible {
private final IDriver driver;
private DriverTaskStatus status;
- // the higher this field is, the higher probability it will be scheduled.
- private volatile double schedulePriority;
private final long ddl;
private final Lock lock;
@@ -52,20 +52,25 @@ public class DriverTask implements IDIndexedAccessible {
private String abortCause;
+ private final AtomicReference<Priority> priority;
+
+ private final DriverTaskHandle driverTaskHandle;
private long lastEnterReadyQueueTime;
private long lastEnterBlockQueueTime;
/** Initialize a dummy instance for queryHolder */
public DriverTask() {
- this(new StubFragmentInstance(), 0L, null);
+ this(new StubFragmentInstance(), 0L, null, null);
}
- public DriverTask(IDriver driver, long timeoutMs, DriverTaskStatus status) {
+ public DriverTask(
+ IDriver driver, long timeoutMs, DriverTaskStatus status, DriverTaskHandle driverTaskHandle) {
this.driver = driver;
this.setStatus(status);
- this.schedulePriority = 0.0D;
this.ddl = System.currentTimeMillis() + timeoutMs;
this.lock = new ReentrantLock();
+ this.driverTaskHandle = driverTaskHandle;
+ this.priority = new AtomicReference<>(new Priority(0, 0));
}
public DriverTaskId getDriverTaskId() {
@@ -99,18 +104,7 @@ public class DriverTask implements IDIndexedAccessible {
* @param context the last execution context.
*/
public void updateSchedulePriority(ExecutionContext context) {
- // TODO: need to implement more complex here
-
- // 1. The penalty factor means that if a task executes less time in one schedule, it will have a
- // high schedule priority
- double penaltyFactor =
- context.getCpuDuration().getWall().getValue(TimeUnit.NANOSECONDS)
- / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
- // 2. If a task is nearly timeout, it should be scheduled as soon as possible.
- long base = System.currentTimeMillis() - ddl;
-
- // 3. Now the final schedulePriority is out, this may not be so reasonable.
- this.schedulePriority = base * penaltyFactor;
+ priority.set(driverTaskHandle.addScheduledTimeInNanos(context.getScheduledTimeInNanos()));
}
public void lock() {
@@ -121,10 +115,6 @@ public class DriverTask implements IDIndexedAccessible {
lock.unlock();
}
- public double getSchedulePriority() {
- return schedulePriority;
- }
-
public long getDDL() {
return ddl;
}
@@ -147,6 +137,31 @@ public class DriverTask implements IDIndexedAccessible {
this.abortCause = abortCause;
}
+ public Priority getPriority() {
+ return priority.get();
+ }
+
+ /**
+ * Updates the (potentially stale) priority value cached in this object. This should be called
+ * when this object is outside the queue.
+ *
+ * @return true if the level changed.
+ */
+ public boolean updatePriority() {
+ Priority newPriority = driverTaskHandle.getPriority();
+ Priority oldPriority = priority.getAndSet(newPriority);
+ return newPriority.getLevel() != oldPriority.getLevel();
+ }
+
+ /**
+ * Updates the task levelScheduledTime to be greater than or equal to the minimum
+ * levelScheduledTime within that level. This ensures that tasks that spend time blocked do not
+ * return and starve already-running tasks. Also updates the cached priority object.
+ */
+ public void resetLevelScheduledTime() {
+ priority.set(driverTaskHandle.resetLevelScheduledTime());
+ }
+
public long getLastEnterReadyQueueTime() {
return lastEnterReadyQueueTime;
}
@@ -181,7 +196,7 @@ public class DriverTask implements IDIndexedAccessible {
}
}
- /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
+ /** a comparator of DriverTask, the higher the levelScheduledTime is, the lower order it has. */
public static class SchedulePriorityComparator implements Comparator<DriverTask> {
@Override
@@ -189,11 +204,11 @@ public class DriverTask implements IDIndexedAccessible {
if (o1.getDriverTaskId().equals(o2.getDriverTaskId())) {
return 0;
}
- if (o1.getSchedulePriority() > o2.getSchedulePriority()) {
- return -1;
- }
- if (o1.getSchedulePriority() < o2.getSchedulePriority()) {
- return 1;
+ int result =
+ Long.compare(
+ o1.priority.get().getLevelScheduledTime(), o2.priority.get().getLevelScheduledTime());
+ if (result != 0) {
+ return result;
}
return o1.getDriverTaskId().compareTo(o2.getDriverTaskId());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 8a3d5280b3..fa1575d1f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
@@ -38,6 +40,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -58,6 +61,11 @@ public class DefaultDriverSchedulerTest {
manager.setBlockManager(mockMPPDataExchangeManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
IDriver mockDriver = Mockito.mock(IDriver.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -71,7 +79,7 @@ public class DefaultDriverSchedulerTest {
DriverTaskStatus.RUNNING,
};
for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
manager.getBlockedTasks().add(testTask);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
@@ -82,13 +90,13 @@ public class DefaultDriverSchedulerTest {
defaultScheduler.blockedToReady(testTask);
Assert.assertEquals(status, testTask.getStatus());
Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
clear();
}
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+ DriverTask testTask =
+ new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED, driverTaskHandle);
manager.getBlockedTasks().add(testTask);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
@@ -99,7 +107,6 @@ public class DefaultDriverSchedulerTest {
defaultScheduler.blockedToReady(testTask);
Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNotNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
@@ -114,6 +121,11 @@ public class DefaultDriverSchedulerTest {
manager.setBlockManager(mockMPPDataExchangeManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
IDriver mockDriver = Mockito.mock(IDriver.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
@@ -128,7 +140,7 @@ public class DefaultDriverSchedulerTest {
DriverTaskStatus.RUNNING,
};
for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -143,7 +155,8 @@ public class DefaultDriverSchedulerTest {
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
clear();
}
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ DriverTask testTask =
+ new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -167,6 +180,11 @@ public class DefaultDriverSchedulerTest {
manager.setBlockManager(mockMPPDataExchangeManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
IDriver mockDriver = Mockito.mock(IDriver.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -180,7 +198,7 @@ public class DefaultDriverSchedulerTest {
DriverTaskStatus.READY,
};
for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -190,13 +208,13 @@ public class DefaultDriverSchedulerTest {
defaultScheduler.runningToReady(testTask, new ExecutionContext());
Assert.assertEquals(status, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
clear();
}
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ DriverTask testTask =
+ new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -207,10 +225,9 @@ public class DefaultDriverSchedulerTest {
context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
context.setCpuDuration(new CpuTimer.CpuDuration());
defaultScheduler.runningToReady(testTask, context);
- Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+ // Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNotNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
@@ -225,6 +242,11 @@ public class DefaultDriverSchedulerTest {
manager.setBlockManager(mockMPPDataExchangeManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
IDriver mockDriver = Mockito.mock(IDriver.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -238,7 +260,7 @@ public class DefaultDriverSchedulerTest {
DriverTaskStatus.READY,
};
for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -248,13 +270,13 @@ public class DefaultDriverSchedulerTest {
defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
Assert.assertEquals(status, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
clear();
}
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ DriverTask testTask =
+ new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -265,10 +287,9 @@ public class DefaultDriverSchedulerTest {
context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
context.setCpuDuration(new CpuTimer.CpuDuration());
defaultScheduler.runningToBlocked(testTask, context);
- Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+ // Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
@@ -283,6 +304,11 @@ public class DefaultDriverSchedulerTest {
manager.setBlockManager(mockMPPDataExchangeManager);
ITaskScheduler defaultScheduler = manager.getScheduler();
IDriver mockDriver = Mockito.mock(IDriver.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -296,7 +322,7 @@ public class DefaultDriverSchedulerTest {
DriverTaskStatus.READY,
};
for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -306,13 +332,13 @@ public class DefaultDriverSchedulerTest {
defaultScheduler.runningToFinished(testTask, new ExecutionContext());
Assert.assertEquals(status, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
clear();
}
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ DriverTask testTask =
+ new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, driverTaskHandle);
Set<DriverTask> taskSet = new HashSet<>();
taskSet.add(testTask);
Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -323,10 +349,9 @@ public class DefaultDriverSchedulerTest {
context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
context.setCpuDuration(new CpuTimer.CpuDuration());
defaultScheduler.runningToFinished(testTask, context);
- Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+ // Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
- Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
Assert.assertNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
@@ -340,6 +365,11 @@ public class DefaultDriverSchedulerTest {
manager.setBlockManager(mockMPPDataExchangeManager);
IDataNodeRPCService.Client mockMppServiceClient =
Mockito.mock(IDataNodeRPCService.Client.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
ITaskScheduler defaultScheduler = manager.getScheduler();
QueryId queryId = new QueryId("test");
FragmentInstanceId instanceId1 =
@@ -357,8 +387,10 @@ public class DefaultDriverSchedulerTest {
DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
};
for (DriverTaskStatus status : invalidStates) {
- DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
- DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+ DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status, driverTaskHandle);
+ DriverTask testTask2 =
+ new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED, driverTaskHandle);
+
Set<DriverTask> taskSet1 = new HashSet<>();
taskSet1.add(testTask1);
Set<DriverTask> taskSet2 = new HashSet<>();
@@ -376,8 +408,6 @@ public class DefaultDriverSchedulerTest {
Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
- Assert.assertNull(manager.getReadyQueue().get(testTask1.getDriverTaskId()));
- Assert.assertNull(manager.getReadyQueue().get(testTask2.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getDriverTaskId()));
Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getDriverTaskId()));
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -398,9 +428,10 @@ public class DefaultDriverSchedulerTest {
Mockito.reset(mockDriver2);
Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
- DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+ DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status, driverTaskHandle);
- DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+ DriverTask testTask2 =
+ new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED, driverTaskHandle);
Set<DriverTask> taskSet1 = new HashSet<>();
taskSet1.add(testTask1);
Set<DriverTask> taskSet2 = new HashSet<>();
@@ -422,8 +453,6 @@ public class DefaultDriverSchedulerTest {
Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
- Assert.assertNull(manager.getReadyQueue().get(testTask1.getDriverTaskId()));
- Assert.assertNull(manager.getReadyQueue().get(testTask2.getDriverTaskId()));
Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getDriverTaskId()));
Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getDriverTaskId()));
Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index d55b4ead86..d9d2a3f36c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -73,28 +73,28 @@ public class DriverTaskTimeoutSentinelThreadTest {
"0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
// FINISHED status test
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED, null);
executor.execute(testTask);
Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// ABORTED status test
- testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.ABORTED);
+ testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.ABORTED, null);
executor.execute(testTask);
Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// RUNNING status test
- testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+ testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, null);
executor.execute(testTask);
Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
// BLOCKED status test
- testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+ testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED, null);
executor.execute(testTask);
Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
@@ -134,7 +134,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
AbstractDriverThread executor =
new DriverTaskThread(
"0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertEquals(
@@ -175,7 +175,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
AbstractDriverThread executor =
new DriverTaskThread(
"0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
@@ -225,7 +225,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
AbstractDriverThread executor =
new DriverTaskThread(
"0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
@@ -276,7 +276,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
AbstractDriverThread executor =
new DriverTaskThread(
"0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
@@ -317,7 +317,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
executor.close();
throw new RuntimeException("mock exception");
});
- DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+ DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
taskQueue.push(testTask);
executor.run(); // Here we use run() instead of start() to execute the task in the same thread
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());