You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/01 09:17:55 UTC

[iotdb] branch multiQueue created (now a736d70950)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch multiQueue
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at a736d70950 Allow query scheduler to receive more tasks

This branch includes the following new commits:

     new a736d70950 Allow query scheduler to receive more tasks

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Allow query scheduler to receive more tasks

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch multiQueue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a736d70950b3163b527c89d06508bbb440d420d8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 1 17:17:35 2023 +0800

    Allow query scheduler to receive more tasks
---
 .../db/mpp/execution/schedule/DriverScheduler.java | 51 +++++++++++++++++-----
 .../schedule/queue/IndexedBlockingQueue.java       | 38 ++++++++++++----
 2 files changed, 70 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ddcf072a1b..3175d563bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -40,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +94,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
     this.readyQueue =
         new MultilevelPriorityQueue(LEVEL_TIME_MULTIPLIER, MAX_CAPACITY, new DriverTask());
     this.timeoutQueue =
-        new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
+        new L1PriorityQueue<>(
+            Integer.MAX_VALUE, new DriverTask.TimeoutComparator(), new DriverTask());
     this.queryMap = new ConcurrentHashMap<>();
     this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
     this.scheduler = new Scheduler();
@@ -198,9 +201,18 @@ public class DriverScheduler implements IDriverScheduler, IService {
         if (task.getStatus() != DriverTaskStatus.READY) {
           continue;
         }
-        timeoutQueue.push(task);
-        readyQueue.push(task);
-        task.setLastEnterReadyQueueTime(System.nanoTime());
+        SettableFuture<?> isBlocked = readyQueue.push(task);
+        if (isBlocked.isDone()) {
+          timeoutQueue.push(task);
+          task.setLastEnterReadyQueueTime(System.nanoTime());
+        } else {
+          isBlocked.addListener(
+              () -> {
+                timeoutQueue.push(task);
+                task.setLastEnterReadyQueueTime(System.nanoTime());
+              },
+              MoreExecutors.directExecutor());
+        }
       } finally {
         task.unlock();
       }
@@ -362,12 +374,23 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
 
         task.setStatus(DriverTaskStatus.READY);
-        QUERY_METRICS.recordTaskQueueTime(
-            BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-        task.resetLevelScheduledTime();
-        readyQueue.push(task);
         blockedTasks.remove(task);
+        SettableFuture<?> isBlocked = readyQueue.push(task);
+        if (isBlocked.isDone()) {
+          QUERY_METRICS.recordTaskQueueTime(
+              BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
+          task.setLastEnterReadyQueueTime(System.nanoTime());
+          task.resetLevelScheduledTime();
+        } else {
+          isBlocked.addListener(
+              () -> {
+                QUERY_METRICS.recordTaskQueueTime(
+                    BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
+                task.setLastEnterReadyQueueTime(System.nanoTime());
+                task.resetLevelScheduledTime();
+              },
+              MoreExecutors.directExecutor());
+        }
       } finally {
         task.unlock();
       }
@@ -399,8 +422,14 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.READY);
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-        readyQueue.push(task);
+        SettableFuture<?> isBlocked = readyQueue.push(task);
+        if (isBlocked.isDone()) {
+          task.setLastEnterReadyQueueTime(System.nanoTime());
+        } else {
+          isBlocked.addListener(
+              () -> task.setLastEnterReadyQueueTime(System.nanoTime()),
+              MoreExecutors.directExecutor());
+        }
       } finally {
         task.unlock();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
index 1498bbbc30..1e39eb8f08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
@@ -19,6 +19,10 @@
 package org.apache.iotdb.db.mpp.execution.schedule.queue;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.Deque;
+import java.util.LinkedList;
 
 /**
  * The base class of a special kind of blocking queue, which has these characters:
@@ -40,6 +44,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   private final int MAX_CAPACITY;
   private final E queryHolder;
   private int size;
+  private Deque<SettableFuture<?>> blockedTasks;
 
   /**
    * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
@@ -53,6 +58,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
     this.MAX_CAPACITY = maxCapacity;
     this.queryHolder = queryHolder;
+    this.blockedTasks = new LinkedList<>();
   }
 
   /**
@@ -67,31 +73,38 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
     }
     E output = pollFirst();
     size--;
+    tryToAddBlockedTask();
     return output;
   }
 
   /**
    * Push an element to the queue. The new element position is determined by the implementation. If
-   * the queue size has been reached the maxCapacity, or the queue has already contained an element
-   * with the same ID, an {@link IllegalStateException} will be thrown. If the element is null, an
-   * {@link NullPointerException} will be thrown.
+   * the queue has already contained an element with the same ID, an {@link IllegalStateException}
+   * will be thrown. If the queue size has been reached the maxCapacity, the element will wait a
+   * free space. If the element is null, an {@link NullPointerException} will be thrown.
    *
    * @param element the element to be pushed.
    * @throws NullPointerException the pushed element is null.
    * @throws IllegalStateException the queue size has been reached the maxCapacity, or the queue has
    *     already contained the same ID element .
    */
-  public synchronized void push(E element) {
+  public synchronized SettableFuture<?> push(E element) {
     if (element == null) {
       throw new NullPointerException("pushed element is null");
     }
     Preconditions.checkState(
         !contains(element),
         "The queue has already contained the element: " + element.getDriverTaskId());
-    Preconditions.checkState(size < MAX_CAPACITY, "The queue is full");
-    pushToQueue(element);
-    size++;
-    this.notifyAll();
+    SettableFuture<?> blockedFuture = SettableFuture.create();
+    if (size < MAX_CAPACITY) {
+      pushToQueue(element);
+      size++;
+      this.notifyAll();
+      blockedFuture.set(null);
+    } else {
+      blockedTasks.add(blockedFuture);
+    }
+    return blockedFuture;
   }
 
   /**
@@ -107,6 +120,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
       return null;
     }
     size--;
+    tryToAddBlockedTask();
     return output;
   }
 
@@ -124,9 +138,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   /** Clear all the elements in the queue. */
   public synchronized void clear() {
     clearAllElements();
+    blockedTasks.clear();
     size = 0;
   }
 
+  public synchronized void tryToAddBlockedTask() {
+    if (!blockedTasks.isEmpty()) {
+      SettableFuture<?> blockedFuture = blockedTasks.pollFirst();
+      blockedFuture.set(null);
+    }
+  }
+
   /**
    * Get the current queue size.
    *