You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/01 09:17:56 UTC

[iotdb] 01/01: Allow query scheduler to receive more tasks

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch multiQueue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a736d70950b3163b527c89d06508bbb440d420d8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 1 17:17:35 2023 +0800

    Allow query scheduler to receive more tasks
---
 .../db/mpp/execution/schedule/DriverScheduler.java | 51 +++++++++++++++++-----
 .../schedule/queue/IndexedBlockingQueue.java       | 38 ++++++++++++----
 2 files changed, 70 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ddcf072a1b..3175d563bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -40,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +94,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
     this.readyQueue =
         new MultilevelPriorityQueue(LEVEL_TIME_MULTIPLIER, MAX_CAPACITY, new DriverTask());
     this.timeoutQueue =
-        new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
+        new L1PriorityQueue<>(
+            Integer.MAX_VALUE, new DriverTask.TimeoutComparator(), new DriverTask());
     this.queryMap = new ConcurrentHashMap<>();
     this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
     this.scheduler = new Scheduler();
@@ -198,9 +201,18 @@ public class DriverScheduler implements IDriverScheduler, IService {
         if (task.getStatus() != DriverTaskStatus.READY) {
           continue;
         }
-        timeoutQueue.push(task);
-        readyQueue.push(task);
-        task.setLastEnterReadyQueueTime(System.nanoTime());
+        SettableFuture<?> isBlocked = readyQueue.push(task);
+        if (isBlocked.isDone()) {
+          timeoutQueue.push(task);
+          task.setLastEnterReadyQueueTime(System.nanoTime());
+        } else {
+          isBlocked.addListener(
+              () -> {
+                timeoutQueue.push(task);
+                task.setLastEnterReadyQueueTime(System.nanoTime());
+              },
+              MoreExecutors.directExecutor());
+        }
       } finally {
         task.unlock();
       }
@@ -362,12 +374,23 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
 
         task.setStatus(DriverTaskStatus.READY);
-        QUERY_METRICS.recordTaskQueueTime(
-            BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-        task.resetLevelScheduledTime();
-        readyQueue.push(task);
         blockedTasks.remove(task);
+        SettableFuture<?> isBlocked = readyQueue.push(task);
+        if (isBlocked.isDone()) {
+          QUERY_METRICS.recordTaskQueueTime(
+              BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
+          task.setLastEnterReadyQueueTime(System.nanoTime());
+          task.resetLevelScheduledTime();
+        } else {
+          isBlocked.addListener(
+              () -> {
+                QUERY_METRICS.recordTaskQueueTime(
+                    BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
+                task.setLastEnterReadyQueueTime(System.nanoTime());
+                task.resetLevelScheduledTime();
+              },
+              MoreExecutors.directExecutor());
+        }
       } finally {
         task.unlock();
       }
@@ -399,8 +422,14 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.READY);
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-        readyQueue.push(task);
+        SettableFuture<?> isBlocked = readyQueue.push(task);
+        if (isBlocked.isDone()) {
+          task.setLastEnterReadyQueueTime(System.nanoTime());
+        } else {
+          isBlocked.addListener(
+              () -> task.setLastEnterReadyQueueTime(System.nanoTime()),
+              MoreExecutors.directExecutor());
+        }
       } finally {
         task.unlock();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
index 1498bbbc30..1e39eb8f08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
@@ -19,6 +19,10 @@
 package org.apache.iotdb.db.mpp.execution.schedule.queue;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.Deque;
+import java.util.LinkedList;
 
 /**
  * The base class of a special kind of blocking queue, which has these characters:
@@ -40,6 +44,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   private final int MAX_CAPACITY;
   private final E queryHolder;
   private int size;
+  private Deque<SettableFuture<?>> blockedTasks;
 
   /**
    * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
@@ -53,6 +58,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
     this.MAX_CAPACITY = maxCapacity;
     this.queryHolder = queryHolder;
+    this.blockedTasks = new LinkedList<>();
   }
 
   /**
@@ -67,31 +73,38 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
     }
     E output = pollFirst();
     size--;
+    tryToAddBlockedTask();
     return output;
   }
 
   /**
    * Push an element to the queue. The new element position is determined by the implementation. If
-   * the queue size has been reached the maxCapacity, or the queue has already contained an element
-   * with the same ID, an {@link IllegalStateException} will be thrown. If the element is null, an
-   * {@link NullPointerException} will be thrown.
+   * the queue has already contained an element with the same ID, an {@link IllegalStateException}
+   * will be thrown. If the queue size has been reached the maxCapacity, the element will wait a
+   * free space. If the element is null, an {@link NullPointerException} will be thrown.
    *
    * @param element the element to be pushed.
    * @throws NullPointerException the pushed element is null.
    * @throws IllegalStateException the queue size has been reached the maxCapacity, or the queue has
    *     already contained the same ID element .
    */
-  public synchronized void push(E element) {
+  public synchronized SettableFuture<?> push(E element) {
     if (element == null) {
       throw new NullPointerException("pushed element is null");
     }
     Preconditions.checkState(
         !contains(element),
         "The queue has already contained the element: " + element.getDriverTaskId());
-    Preconditions.checkState(size < MAX_CAPACITY, "The queue is full");
-    pushToQueue(element);
-    size++;
-    this.notifyAll();
+    SettableFuture<?> blockedFuture = SettableFuture.create();
+    if (size < MAX_CAPACITY) {
+      pushToQueue(element);
+      size++;
+      this.notifyAll();
+      blockedFuture.set(null);
+    } else {
+      blockedTasks.add(blockedFuture);
+    }
+    return blockedFuture;
   }
 
   /**
@@ -107,6 +120,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
       return null;
     }
     size--;
+    tryToAddBlockedTask();
     return output;
   }
 
@@ -124,9 +138,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   /** Clear all the elements in the queue. */
   public synchronized void clear() {
     clearAllElements();
+    blockedTasks.clear();
     size = 0;
   }
 
+  public synchronized void tryToAddBlockedTask() {
+    if (!blockedTasks.isEmpty()) {
+      SettableFuture<?> blockedFuture = blockedTasks.pollFirst();
+      blockedFuture.set(null);
+    }
+  }
+
   /**
    * Get the current queue size.
    *