You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/01 09:17:56 UTC
[iotdb] 01/01: Allow query scheduler to receive more tasks
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch multiQueue
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a736d70950b3163b527c89d06508bbb440d420d8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 1 17:17:35 2023 +0800
Allow query scheduler to receive more tasks
---
.../db/mpp/execution/schedule/DriverScheduler.java | 51 +++++++++++++++++-----
.../schedule/queue/IndexedBlockingQueue.java | 38 ++++++++++++----
2 files changed, 70 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ddcf072a1b..3175d563bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -40,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +94,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
this.readyQueue =
new MultilevelPriorityQueue(LEVEL_TIME_MULTIPLIER, MAX_CAPACITY, new DriverTask());
this.timeoutQueue =
- new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask());
+ new L1PriorityQueue<>(
+ Integer.MAX_VALUE, new DriverTask.TimeoutComparator(), new DriverTask());
this.queryMap = new ConcurrentHashMap<>();
this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
this.scheduler = new Scheduler();
@@ -198,9 +201,18 @@ public class DriverScheduler implements IDriverScheduler, IService {
if (task.getStatus() != DriverTaskStatus.READY) {
continue;
}
- timeoutQueue.push(task);
- readyQueue.push(task);
- task.setLastEnterReadyQueueTime(System.nanoTime());
+ SettableFuture<?> isBlocked = readyQueue.push(task);
+ if (isBlocked.isDone()) {
+ timeoutQueue.push(task);
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ } else {
+ isBlocked.addListener(
+ () -> {
+ timeoutQueue.push(task);
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ },
+ MoreExecutors.directExecutor());
+ }
} finally {
task.unlock();
}
@@ -362,12 +374,23 @@ public class DriverScheduler implements IDriverScheduler, IService {
}
task.setStatus(DriverTaskStatus.READY);
- QUERY_METRICS.recordTaskQueueTime(
- BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
- task.setLastEnterReadyQueueTime(System.nanoTime());
- task.resetLevelScheduledTime();
- readyQueue.push(task);
blockedTasks.remove(task);
+ SettableFuture<?> isBlocked = readyQueue.push(task);
+ if (isBlocked.isDone()) {
+ QUERY_METRICS.recordTaskQueueTime(
+ BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ task.resetLevelScheduledTime();
+ } else {
+ isBlocked.addListener(
+ () -> {
+ QUERY_METRICS.recordTaskQueueTime(
+ BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ task.resetLevelScheduledTime();
+ },
+ MoreExecutors.directExecutor());
+ }
} finally {
task.unlock();
}
@@ -399,8 +422,14 @@ public class DriverScheduler implements IDriverScheduler, IService {
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.READY);
- task.setLastEnterReadyQueueTime(System.nanoTime());
- readyQueue.push(task);
+ SettableFuture<?> isBlocked = readyQueue.push(task);
+ if (isBlocked.isDone()) {
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ } else {
+ isBlocked.addListener(
+ () -> task.setLastEnterReadyQueueTime(System.nanoTime()),
+ MoreExecutors.directExecutor());
+ }
} finally {
task.unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
index 1498bbbc30..1e39eb8f08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.mpp.execution.schedule.queue;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.Deque;
+import java.util.LinkedList;
/**
* The base class of a special kind of blocking queue, which has these characters:
@@ -40,6 +44,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
private final int MAX_CAPACITY;
private final E queryHolder;
private int size;
+ private Deque<SettableFuture<?>> blockedTasks;
/**
* Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
@@ -53,6 +58,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
this.MAX_CAPACITY = maxCapacity;
this.queryHolder = queryHolder;
+ this.blockedTasks = new LinkedList<>();
}
/**
@@ -67,31 +73,38 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
}
E output = pollFirst();
size--;
+ tryToAddBlockedTask();
return output;
}
/**
* Push an element to the queue. The new element position is determined by the implementation. If
- * the queue size has been reached the maxCapacity, or the queue has already contained an element
- * with the same ID, an {@link IllegalStateException} will be thrown. If the element is null, an
- * {@link NullPointerException} will be thrown.
+ * the queue has already contained an element with the same ID, an {@link IllegalStateException}
+ * will be thrown. If the queue size has been reached the maxCapacity, the element will wait a
+ * free space. If the element is null, an {@link NullPointerException} will be thrown.
*
* @param element the element to be pushed.
* @throws NullPointerException the pushed element is null.
* @throws IllegalStateException the queue size has been reached the maxCapacity, or the queue has
* already contained the same ID element .
*/
- public synchronized void push(E element) {
+ public synchronized SettableFuture<?> push(E element) {
if (element == null) {
throw new NullPointerException("pushed element is null");
}
Preconditions.checkState(
!contains(element),
"The queue has already contained the element: " + element.getDriverTaskId());
- Preconditions.checkState(size < MAX_CAPACITY, "The queue is full");
- pushToQueue(element);
- size++;
- this.notifyAll();
+ SettableFuture<?> blockedFuture = SettableFuture.create();
+ if (size < MAX_CAPACITY) {
+ pushToQueue(element);
+ size++;
+ this.notifyAll();
+ blockedFuture.set(null);
+ } else {
+ blockedTasks.add(blockedFuture);
+ }
+ return blockedFuture;
}
/**
@@ -107,6 +120,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
return null;
}
size--;
+ tryToAddBlockedTask();
return output;
}
@@ -124,9 +138,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
/** Clear all the elements in the queue. */
public synchronized void clear() {
clearAllElements();
+ blockedTasks.clear();
size = 0;
}
+ public synchronized void tryToAddBlockedTask() {
+ if (!blockedTasks.isEmpty()) {
+ SettableFuture<?> blockedFuture = blockedTasks.pollFirst();
+ blockedFuture.set(null);
+ }
+ }
+
/**
* Get the current queue size.
*