You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/12 08:38:26 UTC

[iotdb] branch master updated: [IOTDB-5370] Implement MultilevelPriorityQueue for query schedule

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e5e53ad97 [IOTDB-5370] Implement MultilevelPriorityQueue for query schedule
0e5e53ad97 is described below

commit 0e5e53ad97101c00e759f14886653ce1925c043a
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Thu Jan 12 16:38:21 2023 +0800

    [IOTDB-5370] Implement MultilevelPriorityQueue for query schedule
---
 .../db/mpp/execution/schedule/DriverScheduler.java |  32 ++-
 .../mpp/execution/schedule/DriverTaskThread.java   |   8 +-
 .../mpp/execution/schedule/ExecutionContext.java   |  10 +
 .../mpp/execution/schedule/IDriverScheduler.java   |   3 +-
 .../queue/multilevelqueue/DriverTaskHandle.java    |  92 +++++++
 .../multilevelqueue/MultilevelPriorityQueue.java   | 290 +++++++++++++++++++++
 .../schedule/queue/multilevelqueue/Priority.java   |  73 ++++++
 .../db/mpp/execution/schedule/task/DriverTask.java |  71 +++--
 .../schedule/DefaultDriverSchedulerTest.java       |  87 ++++---
 .../DriverTaskTimeoutSentinelThreadTest.java       |  18 +-
 10 files changed, 610 insertions(+), 74 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 4577f16809..e9f097f7bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.L1PriorityQueue;
-import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
@@ -47,8 +49,10 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
@@ -60,6 +64,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
   private static final Logger logger = LoggerFactory.getLogger(DriverScheduler.class);
   private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
+  private static final double LEVEL_TIME_MULTIPLIER = 2;
+
   public static DriverScheduler getInstance() {
     return InstanceHolder.instance;
   }
@@ -69,6 +75,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
   private final Set<DriverTask> blockedTasks;
   private final Map<QueryId, Map<FragmentInstanceId, Set<DriverTask>>> queryMap;
   private final ITaskScheduler scheduler;
+
+  private final AtomicInteger nextDriverTaskHandleId = new AtomicInteger(0);
   private IMPPDataExchangeManager blockManager;
 
   private static final int MAX_CAPACITY =
@@ -82,8 +90,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
 
   private DriverScheduler() {
     this.readyQueue =
-        new L2PriorityQueue<>(
-            MAX_CAPACITY, new DriverTask.SchedulePriorityComparator(), new DriverTask());
+        new MultilevelPriorityQueue(LEVEL_TIME_MULTIPLIER, MAX_CAPACITY, new DriverTask());
     this.timeoutQueue =
         new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
     this.queryMap = new ConcurrentHashMap<>();
@@ -162,12 +169,20 @@ public class DriverScheduler implements IDriverScheduler, IService {
 
   @Override
   public void submitDrivers(QueryId queryId, List<IDriver> drivers, long timeOut) {
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            getNextDriverTaskHandleId(),
+            (MultilevelPriorityQueue) readyQueue,
+            OptionalInt.of(Integer.MAX_VALUE));
     List<DriverTask> tasks =
         drivers.stream()
             .map(
                 v ->
                     new DriverTask(
-                        v, timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS, DriverTaskStatus.READY))
+                        v,
+                        timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
+                        DriverTaskStatus.READY,
+                        driverTaskHandle))
             .collect(Collectors.toList());
     for (DriverTask driverTask : tasks) {
       queryMap
@@ -232,13 +247,13 @@ public class DriverScheduler implements IDriverScheduler, IService {
   }
 
   @Override
-  public double getSchedulePriority(DriverTaskId driverTaskID) {
+  public Priority getSchedulePriority(DriverTaskId driverTaskID) {
     DriverTask task = timeoutQueue.get(driverTaskID);
     if (task == null) {
       throw new IllegalStateException(
           "the fragmentInstance " + driverTaskID.getFullId() + " has been cleared");
     }
-    return task.getSchedulePriority();
+    return task.getPriority();
   }
 
   private void clearDriverTask(DriverTask task) {
@@ -289,6 +304,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
     }
   }
 
+  private int getNextDriverTaskHandleId() {
+    return nextDriverTaskHandleId.getAndIncrement();
+  }
+
   ITaskScheduler getScheduler() {
     return scheduler;
   }
@@ -346,6 +365,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
         QUERY_METRICS.recordTaskQueueTime(
             BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
         task.setLastEnterReadyQueueTime(System.nanoTime());
+        task.resetLevelScheduledTime();
         readyQueue.push(task);
         blockedTasks.remove(task);
       } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index 3dd977a9ee..33028af21b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.db.utils.stats.CpuTimer;
 
+import com.google.common.base.Ticker;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 
@@ -44,6 +45,8 @@ public class DriverTaskThread extends AbstractDriverThread {
   private static final Executor listeningExecutor =
       IoTDBThreadPoolFactory.newCachedThreadPool("scheduler-notification");
 
+  private final Ticker ticker;
+
   public DriverTaskThread(
       String workerId,
       ThreadGroup tg,
@@ -51,10 +54,12 @@ public class DriverTaskThread extends AbstractDriverThread {
       ITaskScheduler scheduler,
       ThreadProducer producer) {
     super(workerId, tg, queue, scheduler, producer);
+    this.ticker = Ticker.systemTicker();
   }
 
   @Override
   public void execute(DriverTask task) throws InterruptedException {
+    long startNanos = ticker.read();
     // try to switch it to RUNNING
     if (!scheduler.readyToRunning(task)) {
       return;
@@ -63,15 +68,16 @@ public class DriverTaskThread extends AbstractDriverThread {
     CpuTimer timer = new CpuTimer();
     ListenableFuture<?> future = driver.processFor(EXECUTION_TIME_SLICE);
     CpuTimer.CpuDuration duration = timer.elapsedTime();
-    // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be thrown.
     if (future.isCancelled()) {
       task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED);
       scheduler.toAborted(task);
       return;
     }
+    long quantaScheduledNanos = ticker.read() - startNanos;
     ExecutionContext context = new ExecutionContext();
     context.setCpuDuration(duration);
+    context.setScheduledTimeInNanos(quantaScheduledNanos);
     context.setTimeSlice(EXECUTION_TIME_SLICE);
     if (driver.isFinished()) {
       scheduler.runningToFinished(task, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
index df821dac68..69959e4016 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/ExecutionContext.java
@@ -26,6 +26,8 @@ import io.airlift.units.Duration;
 /** The execution context of a {@link DriverTask} */
 public class ExecutionContext {
   private CpuTimer.CpuDuration cpuDuration;
+
+  private long scheduledTimeInNanos;
   private Duration timeSlice;
 
   public CpuTimer.CpuDuration getCpuDuration() {
@@ -36,6 +38,14 @@ public class ExecutionContext {
     this.cpuDuration = cpuDuration;
   }
 
+  public long getScheduledTimeInNanos() {
+    return scheduledTimeInNanos;
+  }
+
+  public void setScheduledTimeInNanos(long scheduledTimeInNanos) {
+    this.scheduledTimeInNanos = scheduledTimeInNanos;
+  }
+
   public Duration getTimeSlice() {
     return timeSlice;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
index 93dd3e314c..5729a2718c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/IDriverScheduler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 
 import java.util.List;
@@ -59,5 +60,5 @@ public interface IDriverScheduler {
    * @return the schedule priority.
    * @throws IllegalStateException if the instance has already been cleared.
    */
-  double getSchedulePriority(DriverTaskId driverTaskID);
+  Priority getSchedulePriority(DriverTaskId driverTaskID);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
new file mode 100644
index 0000000000..e259803f99
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+public class DriverTaskHandle {
+
+  private final int driverTaskHandleId;
+
+  @GuardedBy("this")
+  private long scheduledTimeInNanos;
+
+  private final MultilevelPriorityQueue driverTaskQueue;
+
+  /** It is not used for now but can be used to limit the driverNum per Task in the future. */
+  private final OptionalInt maxDriversPerTask;
+
+  private final AtomicReference<Priority> priority = new AtomicReference<>(new Priority(0, 0));
+
+  public DriverTaskHandle(
+      int driverTaskHandleId,
+      MultilevelPriorityQueue driverTaskQueue,
+      OptionalInt maxDriversPerTask) {
+    this.driverTaskHandleId = driverTaskHandleId;
+    this.driverTaskQueue = requireNonNull(driverTaskQueue, "driverTaskQueue is null");
+    this.maxDriversPerTask = requireNonNull(maxDriversPerTask, "maxDriversPerTask is null");
+  }
+
+  public synchronized Priority addScheduledTimeInNanos(long durationNanos) {
+    scheduledTimeInNanos += durationNanos;
+    Priority newPriority =
+        driverTaskQueue.updatePriority(priority.get(), durationNanos, scheduledTimeInNanos);
+
+    priority.set(newPriority);
+    return newPriority;
+  }
+
+  public synchronized Priority resetLevelScheduledTime() {
+    long levelMinScheduledTime =
+        driverTaskQueue.getLevelMinScheduledTime(priority.get().getLevel(), scheduledTimeInNanos);
+    if (priority.get().getLevelScheduledTime() < levelMinScheduledTime) {
+      Priority newPriority = new Priority(priority.get().getLevel(), levelMinScheduledTime);
+      priority.set(newPriority);
+      return newPriority;
+    }
+
+    return priority.get();
+  }
+
+  public Priority getPriority() {
+    return priority.get();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DriverTaskHandle that = (DriverTaskHandle) o;
+    return driverTaskHandleId == that.driverTaskHandleId;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(driverTaskHandleId);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
new file mode 100644
index 0000000000..97ecf3927e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue;
+
+import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is inspired by Trino <a
+ * href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java">...</a>
+ */
+public class MultilevelPriorityQueue extends IndexedBlockingQueue<DriverTask> {
+  /** Scheduled time threshold of TASK in each level */
+  static final int[] LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300};
+
+  /** the upper limit one Task can contribute to its level in one scheduled time */
+  static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30);
+
+  private final PriorityQueue<DriverTask>[] levelWaitingSplits;
+
+  /**
+   * Total amount of time each LEVEL has occupied, which decides which level we will take task from.
+   */
+  private final AtomicLong[] levelScheduledTime;
+
+  /** The minimum scheduled time which current TASK in each level has. */
+  private final AtomicLong[] levelMinScheduledTime;
+
+  /**
+   * Expected schedule time of each LEVEL.
+   *
+   * <p>The proportion of level0-level4 is: levelTimeMultiplier^4 : levelTimeMultiplier^3 :
+   * levelTimeMultiplier^2 : levelTimeMultiplier : 1
+   */
+  private final double levelTimeMultiplier;
+
+  public MultilevelPriorityQueue(
+      double levelTimeMultiplier, int maxCapacity, DriverTask queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
+    this.levelMinScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
+    this.levelWaitingSplits = new PriorityQueue[LEVEL_THRESHOLD_SECONDS.length];
+    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
+      levelScheduledTime[level] = new AtomicLong();
+      levelMinScheduledTime[level] = new AtomicLong(-1);
+      levelWaitingSplits[level] = new PriorityQueue<>(new DriverTask.SchedulePriorityComparator());
+    }
+    this.levelTimeMultiplier = levelTimeMultiplier;
+  }
+
+  /**
+   * During periods of time when a level has no waiting splits, it will not accumulate scheduled
+   * time and will fall behind relative to other levels.
+   *
+   * <p>This can cause temporary starvation for other levels when splits do reach the
+   * previously-empty level.
+   *
+   * <p>To prevent this we set the scheduled time for levels which were empty to the expected
+   * scheduled time.
+   */
+  @Override
+  public void pushToQueue(DriverTask task) {
+    checkArgument(task != null, "DriverTask to be pushed is null");
+
+    int level = task.getPriority().getLevel();
+    if (levelWaitingSplits[level].isEmpty()) {
+      // Accesses to levelScheduledTime are not synchronized, so we have a data race
+      // here - our level time math will be off. However, the staleness is bounded by
+      // the fact that only running splits that complete during this computation
+      // can update the level time. Therefore, this is benign.
+      long level0Time = getLevel0TargetTime();
+      long levelExpectedTime = (long) (level0Time / Math.pow(levelTimeMultiplier, level));
+      long delta = levelExpectedTime - levelScheduledTime[level].get();
+      levelScheduledTime[level].addAndGet(delta);
+    }
+    levelWaitingSplits[level].offer(task);
+  }
+
+  protected DriverTask pollFirst() {
+    DriverTask result;
+    while (true) {
+      result = chooseLevelAndTask();
+      if (result.updatePriority()) {
+        // result.updatePriority() returns true means that the Priority of DriverTaskHandle the
+        // result belongs to has changed.
+        // All the DriverTasks of one DriverTaskHandle should be in the same level.
+        // We push the result into the queue and choose another DriverTask.
+        pushToQueue(result);
+        continue;
+      }
+      int selectedLevel = result.getPriority().getLevel();
+      levelMinScheduledTime[selectedLevel].set(result.getPriority().getLevelScheduledTime());
+      return result;
+    }
+  }
+
+  /**
+   * We attempt to give each level a target amount of scheduled time, which is configurable using
+   * levelTimeMultiplier.
+   *
+   * <p>This function selects the level that has the lowest ratio of actual to the target time with
+   * the objective of minimizing deviation from the target scheduled time. From this level, we pick
+   * the DriverTask with the lowest scheduled time.
+   */
+  private DriverTask chooseLevelAndTask() {
+    long targetScheduledTime = getLevel0TargetTime();
+    double worstRatio = 1;
+    int selectedLevel = -1;
+    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
+      if (!levelWaitingSplits[level].isEmpty()) {
+        long levelTime = levelScheduledTime[level].get();
+        double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime);
+        if (selectedLevel == -1 || ratio > worstRatio) {
+          worstRatio = ratio;
+          selectedLevel = level;
+        }
+      }
+
+      targetScheduledTime /= levelTimeMultiplier;
+    }
+
+    // selected level == -1 means that the queue is empty and this method is only called when the
+    // queue is not empty.
+    checkState(selectedLevel != -1, "selected level can not equal to -1");
+    DriverTask result = levelWaitingSplits[selectedLevel].poll();
+    checkState(result != null, "result driverTask cannot be null");
+    return result;
+  }
+
+  @Override
+  protected DriverTask remove(DriverTask driverTask) {
+    checkArgument(driverTask != null, "driverTask is null");
+    for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+      if (level.remove(driverTask)) {
+        break;
+      }
+    }
+    return driverTask;
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+      if (!level.isEmpty()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  protected boolean contains(DriverTask driverTask) {
+    for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+      if (level.contains(driverTask)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected DriverTask get(DriverTask driverTask) {
+    // We do not support get() for MultilevelPriorityQueue since it is inefficient and not
+    // necessary.
+    throw new UnsupportedOperationException(
+        "MultilevelPriorityQueue does not support access element by get.");
+  }
+
+  @Override
+  protected void clearAllElements() {
+    for (PriorityQueue<DriverTask> level : levelWaitingSplits) {
+      level.clear();
+    }
+  }
+
+  /**
+   * Get the expected scheduled time of LEVEL0 based on the maximum scheduled time of all levels
+   * after normalization.
+   *
+   * <p>For example, the levelTimeMultiplier is 2, which means the expected proportion of level0-1
+   * is 2 : 1. However, the actual proportion of levelScheduledTime of level0 and level1 is 3 : 2,
+   * in this situation the expected time of level0 will be Math.max(3, 2 * 2) = 4.
+   *
+   * @return the expected scheduled time of LEVEL0
+   */
+  private synchronized long getLevel0TargetTime() {
+    long level0TargetTime = levelScheduledTime[0].get();
+    double currentMultiplier = levelTimeMultiplier;
+
+    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
+      currentMultiplier /= levelTimeMultiplier;
+      long levelTime = levelScheduledTime[level].get();
+      level0TargetTime = Math.max(level0TargetTime, (long) (levelTime / currentMultiplier));
+    }
+
+    return level0TargetTime;
+  }
+
+  private void addLevelTime(int level, long nanos) {
+    levelScheduledTime[level].addAndGet(nanos);
+  }
+
+  /**
+   * MultilevelPriorityQueue charges the quanta run time to the task and the level it belongs to in
+   * an effort to maintain the target thread utilization ratios between levels and to maintain
+   * fairness within a level.
+   *
+   * <p>Consider an example DriverTask where a read hung for several minutes. This is either a bug
+   * or a failing dependency. In either case we do not want to charge the task too much, and we
+   * especially do not want to charge the level too much - i.e. cause other queries in this level to
+   * starve.
+   *
+   * @return the new priority for the task
+   */
+  public Priority updatePriority(Priority oldPriority, long quantaNanos, long scheduledNanos) {
+    int oldLevel = oldPriority.getLevel();
+    int newLevel = computeLevel(scheduledNanos);
+
+    long levelContribution = Math.min(quantaNanos, LEVEL_CONTRIBUTION_CAP);
+
+    if (oldLevel == newLevel) {
+      addLevelTime(oldLevel, levelContribution);
+      return new Priority(oldLevel, oldPriority.getLevelScheduledTime() + quantaNanos);
+    }
+
+    long remainingLevelContribution = levelContribution;
+    long remainingTaskTime = quantaNanos;
+
+    // a task normally slowly accrues scheduled time in a level and then moves to the next, but
+    // if the task had a particularly long quanta, accrue time to each level as if it had run
+    // in that level up to the level limit.
+    for (int currentLevel = oldLevel; currentLevel < newLevel; currentLevel++) {
+      long timeAccruedToLevel =
+          Math.min(
+              SECONDS.toNanos(
+                  LEVEL_THRESHOLD_SECONDS[currentLevel + 1]
+                      - LEVEL_THRESHOLD_SECONDS[currentLevel]),
+              remainingLevelContribution);
+      addLevelTime(currentLevel, timeAccruedToLevel);
+      remainingLevelContribution -= timeAccruedToLevel;
+      remainingTaskTime -= timeAccruedToLevel;
+    }
+
+    addLevelTime(newLevel, remainingLevelContribution);
+    // TODO figure out why add newLevelMinScheduledTime
+    long newLevelMinScheduledTime = getLevelMinScheduledTime(newLevel, scheduledNanos);
+    return new Priority(newLevel, newLevelMinScheduledTime + remainingTaskTime);
+  }
+
+  public long getLevelMinScheduledTime(int level, long taskThreadUsageNanos) {
+    levelMinScheduledTime[level].compareAndSet(-1, taskThreadUsageNanos);
+    return levelMinScheduledTime[level].get();
+  }
+
+  public static int computeLevel(long threadUsageNanos) {
+    long seconds = NANOSECONDS.toSeconds(threadUsageNanos);
+    for (int level = 0; level < (LEVEL_THRESHOLD_SECONDS.length - 1); level++) {
+      if (seconds < LEVEL_THRESHOLD_SECONDS[level + 1]) {
+        return level;
+      }
+    }
+
+    return LEVEL_THRESHOLD_SECONDS.length - 1;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/Priority.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/Priority.java
new file mode 100644
index 0000000000..734385784b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/multilevelqueue/Priority.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue;
+
+import javax.annotation.concurrent.Immutable;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * This class is inspired by Trino <a
+ * href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/executor/Priority.java">...</a>
+ * Priority is composed of a level and a within-level priority. Level decides which queue the
+ * DriverTask is placed in, while within-level priority decides which DriverTask is executed next in
+ * that level.
+ *
+ * <p>Tasks move from a lower to higher level as they exceed level thresholds of total scheduled
+ * time accrued to a task.
+ *
+ * <p>The priority within a level increases with the scheduled time accumulated in that level. This
+ * is necessary to achieve fairness when tasks acquire scheduled time at varying rates.
+ *
+ * <p>However, this priority is <b>not</b> equal to the task total accrued scheduled time. When a
+ * task graduates to a higher level, the level priority is set to the minimum current priority in
+ * the new level. This allows us to maintain instantaneous fairness in terms of scheduled time.
+ */
+@Immutable
+public final class Priority {
+  private final int level;
+
+  /**
+   * Occupied time in particular level of this task. The higher this value is, the later the task
+   * with this Priority will be polled out by a PriorityQueue.
+   */
+  private final long levelScheduledTime;
+
+  public Priority(int level, long levelScheduledTime) {
+    this.level = level;
+    this.levelScheduledTime = levelScheduledTime;
+  }
+
+  public int getLevel() {
+    return level;
+  }
+
+  public long getLevelScheduledTime() {
+    return levelScheduledTime;
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("level", level)
+        .add("levelScheduledTime", levelScheduledTime)
+        .toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index ef173a45be..e008b84210 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
 import org.apache.iotdb.db.mpp.execution.schedule.ExecutionContext;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.ID;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IDIndexedAccessible;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 
 import java.util.Comparator;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -42,8 +44,6 @@ public class DriverTask implements IDIndexedAccessible {
   private final IDriver driver;
   private DriverTaskStatus status;
 
-  // the higher this field is, the higher probability it will be scheduled.
-  private volatile double schedulePriority;
   private final long ddl;
   private final Lock lock;
 
@@ -52,20 +52,25 @@ public class DriverTask implements IDIndexedAccessible {
 
   private String abortCause;
 
+  private final AtomicReference<Priority> priority;
+
+  private final DriverTaskHandle driverTaskHandle;
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
-    this(new StubFragmentInstance(), 0L, null);
+    this(new StubFragmentInstance(), 0L, null, null);
   }
 
-  public DriverTask(IDriver driver, long timeoutMs, DriverTaskStatus status) {
+  public DriverTask(
+      IDriver driver, long timeoutMs, DriverTaskStatus status, DriverTaskHandle driverTaskHandle) {
     this.driver = driver;
     this.setStatus(status);
-    this.schedulePriority = 0.0D;
     this.ddl = System.currentTimeMillis() + timeoutMs;
     this.lock = new ReentrantLock();
+    this.driverTaskHandle = driverTaskHandle;
+    this.priority = new AtomicReference<>(new Priority(0, 0));
   }
 
   public DriverTaskId getDriverTaskId() {
@@ -99,18 +104,7 @@ public class DriverTask implements IDIndexedAccessible {
    * @param context the last execution context.
    */
   public void updateSchedulePriority(ExecutionContext context) {
-    // TODO: need to implement more complex here
-
-    // 1. The penalty factor means that if a task executes less time in one schedule, it will have a
-    // high schedule priority
-    double penaltyFactor =
-        context.getCpuDuration().getWall().getValue(TimeUnit.NANOSECONDS)
-            / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
-    // 2. If a task is nearly timeout, it should be scheduled as soon as possible.
-    long base = System.currentTimeMillis() - ddl;
-
-    // 3. Now the final schedulePriority is out, this may not be so reasonable.
-    this.schedulePriority = base * penaltyFactor;
+    priority.set(driverTaskHandle.addScheduledTimeInNanos(context.getScheduledTimeInNanos()));
   }
 
   public void lock() {
@@ -121,10 +115,6 @@ public class DriverTask implements IDIndexedAccessible {
     lock.unlock();
   }
 
-  public double getSchedulePriority() {
-    return schedulePriority;
-  }
-
   public long getDDL() {
     return ddl;
   }
@@ -147,6 +137,31 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
+  public Priority getPriority() {
+    return priority.get();
+  }
+
+  /**
+   * Updates the (potentially stale) priority value cached in this object. This should be called
+   * when this object is outside the queue.
+   *
+   * @return true if the level changed.
+   */
+  public boolean updatePriority() {
+    Priority newPriority = driverTaskHandle.getPriority();
+    Priority oldPriority = priority.getAndSet(newPriority);
+    return newPriority.getLevel() != oldPriority.getLevel();
+  }
+
+  /**
+   * Updates the task levelScheduledTime to be greater than or equal to the minimum
+   * levelScheduledTime within that level. This ensures that tasks that spend time blocked do not
+   * return and starve already-running tasks. Also updates the cached priority object.
+   */
+  public void resetLevelScheduledTime() {
+    priority.set(driverTaskHandle.resetLevelScheduledTime());
+  }
+
   public long getLastEnterReadyQueueTime() {
     return lastEnterReadyQueueTime;
   }
@@ -181,7 +196,7 @@ public class DriverTask implements IDIndexedAccessible {
     }
   }
 
-  /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
+  /** a comparator of DriverTask, the higher the levelScheduledTime is, the lower order it has. */
   public static class SchedulePriorityComparator implements Comparator<DriverTask> {
 
     @Override
@@ -189,11 +204,11 @@ public class DriverTask implements IDIndexedAccessible {
       if (o1.getDriverTaskId().equals(o2.getDriverTaskId())) {
         return 0;
       }
-      if (o1.getSchedulePriority() > o2.getSchedulePriority()) {
-        return -1;
-      }
-      if (o1.getSchedulePriority() < o2.getSchedulePriority()) {
-        return 1;
+      int result =
+          Long.compare(
+              o1.priority.get().getLevelScheduledTime(), o2.priority.get().getLevelScheduledTime());
+      if (result != 0) {
+        return result;
       }
       return o1.getDriverTaskId().compareTo(o2.getDriverTaskId());
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
index 8a3d5280b3..fa1575d1f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DefaultDriverSchedulerTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
+import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
@@ -38,6 +40,7 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +61,11 @@ public class DefaultDriverSchedulerTest {
     manager.setBlockManager(mockMPPDataExchangeManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -71,7 +79,7 @@ public class DefaultDriverSchedulerTest {
           DriverTaskStatus.RUNNING,
         };
     for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
       manager.getBlockedTasks().add(testTask);
       Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
@@ -82,13 +90,13 @@ public class DefaultDriverSchedulerTest {
       defaultScheduler.blockedToReady(testTask);
       Assert.assertEquals(status, testTask.getStatus());
       Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
       Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
       Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
       clear();
     }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED, driverTaskHandle);
     manager.getBlockedTasks().add(testTask);
     Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
@@ -99,7 +107,6 @@ public class DefaultDriverSchedulerTest {
     defaultScheduler.blockedToReady(testTask);
     Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
@@ -114,6 +121,11 @@ public class DefaultDriverSchedulerTest {
     manager.setBlockManager(mockMPPDataExchangeManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
 
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
@@ -128,7 +140,7 @@ public class DefaultDriverSchedulerTest {
           DriverTaskStatus.RUNNING,
         };
     for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
       Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -143,7 +155,8 @@ public class DefaultDriverSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
       clear();
     }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, driverTaskHandle);
     Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -167,6 +180,11 @@ public class DefaultDriverSchedulerTest {
     manager.setBlockManager(mockMPPDataExchangeManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -180,7 +198,7 @@ public class DefaultDriverSchedulerTest {
           DriverTaskStatus.READY,
         };
     for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
       Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -190,13 +208,13 @@ public class DefaultDriverSchedulerTest {
       defaultScheduler.runningToReady(testTask, new ExecutionContext());
       Assert.assertEquals(status, testTask.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
       Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
       Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
       clear();
     }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, driverTaskHandle);
     Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -207,10 +225,9 @@ public class DefaultDriverSchedulerTest {
     context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
     context.setCpuDuration(new CpuTimer.CpuDuration());
     defaultScheduler.runningToReady(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+    // Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
     Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNotNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
@@ -225,6 +242,11 @@ public class DefaultDriverSchedulerTest {
     manager.setBlockManager(mockMPPDataExchangeManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -238,7 +260,7 @@ public class DefaultDriverSchedulerTest {
           DriverTaskStatus.READY,
         };
     for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
       Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -248,13 +270,13 @@ public class DefaultDriverSchedulerTest {
       defaultScheduler.runningToBlocked(testTask, new ExecutionContext());
       Assert.assertEquals(status, testTask.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
       Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
       Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
       clear();
     }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, driverTaskHandle);
     Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -265,10 +287,9 @@ public class DefaultDriverSchedulerTest {
     context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
     context.setCpuDuration(new CpuTimer.CpuDuration());
     defaultScheduler.runningToBlocked(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+    // Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
     Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
     Assert.assertTrue(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
@@ -283,6 +304,11 @@ public class DefaultDriverSchedulerTest {
     manager.setBlockManager(mockMPPDataExchangeManager);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId =
         new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
@@ -296,7 +322,7 @@ public class DefaultDriverSchedulerTest {
           DriverTaskStatus.READY,
         };
     for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask = new DriverTask(mockDriver, 100L, status);
+      DriverTask testTask = new DriverTask(mockDriver, 100L, status, driverTaskHandle);
       Set<DriverTask> taskSet = new HashSet<>();
       taskSet.add(testTask);
       Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -306,13 +332,13 @@ public class DefaultDriverSchedulerTest {
       defaultScheduler.runningToFinished(testTask, new ExecutionContext());
       Assert.assertEquals(status, testTask.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-      Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
       Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
       Assert.assertTrue(manager.getQueryMap().get(queryId).get(instanceId).contains(testTask));
       clear();
     }
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, driverTaskHandle);
     Set<DriverTask> taskSet = new HashSet<>();
     taskSet.add(testTask);
     Map<FragmentInstanceId, Set<DriverTask>> fragmentRelatedTask = new ConcurrentHashMap<>();
@@ -323,10 +349,9 @@ public class DefaultDriverSchedulerTest {
     context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
     context.setCpuDuration(new CpuTimer.CpuDuration());
     defaultScheduler.runningToFinished(testTask, context);
-    Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
+    // Assert.assertEquals(0.0D, testTask.getSchedulePriority(), 0.00001);
     Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
     Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
-    Assert.assertNull(manager.getReadyQueue().get(testTask.getDriverTaskId()));
     Assert.assertNull(manager.getTimeoutQueue().get(testTask.getDriverTaskId()));
     Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
@@ -340,6 +365,11 @@ public class DefaultDriverSchedulerTest {
     manager.setBlockManager(mockMPPDataExchangeManager);
     IDataNodeRPCService.Client mockMppServiceClient =
         Mockito.mock(IDataNodeRPCService.Client.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
     ITaskScheduler defaultScheduler = manager.getScheduler();
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId1 =
@@ -357,8 +387,10 @@ public class DefaultDriverSchedulerTest {
           DriverTaskStatus.FINISHED, DriverTaskStatus.ABORTED,
         };
     for (DriverTaskStatus status : invalidStates) {
-      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
-      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status, driverTaskHandle);
+      DriverTask testTask2 =
+          new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED, driverTaskHandle);
+
       Set<DriverTask> taskSet1 = new HashSet<>();
       taskSet1.add(testTask1);
       Set<DriverTask> taskSet2 = new HashSet<>();
@@ -376,8 +408,6 @@ public class DefaultDriverSchedulerTest {
       Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask2.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
       Assert.assertTrue(manager.getBlockedTasks().contains(testTask2));
-      Assert.assertNull(manager.getReadyQueue().get(testTask1.getDriverTaskId()));
-      Assert.assertNull(manager.getReadyQueue().get(testTask2.getDriverTaskId()));
       Assert.assertNotNull(manager.getTimeoutQueue().get(testTask1.getDriverTaskId()));
       Assert.assertNotNull(manager.getTimeoutQueue().get(testTask2.getDriverTaskId()));
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
@@ -398,9 +428,10 @@ public class DefaultDriverSchedulerTest {
       Mockito.reset(mockDriver2);
       Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
 
-      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status);
+      DriverTask testTask1 = new DriverTask(mockDriver1, 100L, status, driverTaskHandle);
 
-      DriverTask testTask2 = new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED);
+      DriverTask testTask2 =
+          new DriverTask(mockDriver2, 100L, DriverTaskStatus.BLOCKED, driverTaskHandle);
       Set<DriverTask> taskSet1 = new HashSet<>();
       taskSet1.add(testTask1);
       Set<DriverTask> taskSet2 = new HashSet<>();
@@ -422,8 +453,6 @@ public class DefaultDriverSchedulerTest {
       Assert.assertEquals(DriverTaskStatus.ABORTED, testTask2.getStatus());
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask1));
       Assert.assertFalse(manager.getBlockedTasks().contains(testTask2));
-      Assert.assertNull(manager.getReadyQueue().get(testTask1.getDriverTaskId()));
-      Assert.assertNull(manager.getReadyQueue().get(testTask2.getDriverTaskId()));
       Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getDriverTaskId()));
       Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getDriverTaskId()));
       Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index d55b4ead86..d9d2a3f36c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -73,28 +73,28 @@ public class DriverTaskTimeoutSentinelThreadTest {
             "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
 
     // FINISHED status test
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.FINISHED, null);
     executor.execute(testTask);
     Assert.assertEquals(DriverTaskStatus.FINISHED, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // ABORTED status test
-    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.ABORTED);
+    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.ABORTED, null);
     executor.execute(testTask);
     Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // RUNNING status test
-    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING);
+    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.RUNNING, null);
     executor.execute(testTask);
     Assert.assertEquals(DriverTaskStatus.RUNNING, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
     Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // BLOCKED status test
-    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED);
+    testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.BLOCKED, null);
     executor.execute(testTask);
     Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
     Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
@@ -134,7 +134,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     AbstractDriverThread executor =
         new DriverTaskThread(
             "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertEquals(
@@ -175,7 +175,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     AbstractDriverThread executor =
         new DriverTaskThread(
             "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());
@@ -225,7 +225,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     AbstractDriverThread executor =
         new DriverTaskThread(
             "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());
@@ -276,7 +276,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     AbstractDriverThread executor =
         new DriverTaskThread(
             "0", new ThreadGroup("timeout-test"), taskQueue, mockScheduler, producer);
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
     Assert.assertNull(testTask.getAbortCause());
@@ -317,7 +317,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
               executor.close();
               throw new RuntimeException("mock exception");
             });
-    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY);
+    DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null);
     taskQueue.push(testTask);
     executor.run(); // Here we use run() instead of start() to execute the task in the same thread
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());