You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:54 UTC
[iotdb] 04/13: implement consumeOneByOneChildren pipeline divided by dop
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 44dbfc316935443432b874ee05c091f8d603704f
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 9 19:35:51 2023 +0800
implement consumeOneByOneChildren pipeline divided by dop
---
.../iotdb/db/mpp/execution/driver/Driver.java | 15 +++-
.../db/mpp/execution/driver/DriverContext.java | 9 ++
.../db/mpp/execution/schedule/DriverScheduler.java | 95 ++++++++++++++--------
.../db/mpp/execution/schedule/task/DriverTask.java | 11 +++
.../plan/planner/LocalExecutionPlanContext.java | 5 ++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 82 +++++++++++++++++--
.../db/mpp/plan/planner/PipelineDriverFactory.java | 11 ++-
7 files changed, 184 insertions(+), 44 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 14554bb8fb..a1df81be83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -29,11 +29,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
+import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
-
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -91,6 +90,10 @@ public abstract class Driver implements IDriver {
return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
}
+ public DriverContext getDriverContext() {
+ return driverContext;
+ }
+
/**
* do initialization
*
@@ -101,6 +104,14 @@ public abstract class Driver implements IDriver {
/** release resource this driver used */
protected abstract void releaseResource();
+ public boolean hasDependency() {
+ return driverContext.getDependencyDriverIndex() != -1;
+ }
+
+ public int getDependencyDriverIndex() {
+ return driverContext.getDependencyDriverIndex();
+ }
+
@Override
public ListenableFuture<?> processFor(Duration duration) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 95a5861978..ea369d8fad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -39,6 +39,7 @@ public class DriverContext {
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private ISinkHandle sinkHandle;
private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+ private int dependencyDriverIndex = -1;
private final AtomicBoolean finished = new AtomicBoolean();
@@ -69,6 +70,14 @@ public class DriverContext {
throw new UnsupportedOperationException();
}
+ public void setDependencyDriverIndex(int dependencyDriverIndex) {
+ this.dependencyDriverIndex = dependencyDriverIndex;
+ }
+
+ public int getDependencyDriverIndex() {
+ return dependencyDriverIndex;
+ }
+
public void setSinkHandle(ISinkHandle sinkHandle) {
this.sinkHandle = sinkHandle;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 7188717d20..f959007648 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.Driver;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -39,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +55,6 @@ import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -175,43 +177,67 @@ public class DriverScheduler implements IDriverScheduler, IService {
getNextDriverTaskHandleId(),
(MultilevelPriorityQueue) readyQueue,
OptionalInt.of(Integer.MAX_VALUE));
- List<DriverTask> tasks =
- drivers.stream()
- .map(
- v ->
- new DriverTask(
- v,
- timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
- DriverTaskStatus.READY,
- driverTaskHandle))
- .collect(Collectors.toList());
+ List<DriverTask> tasks = new ArrayList<>();
+ drivers.forEach(
+ driver ->
+ tasks.add(
+ new DriverTask(
+ driver,
+ timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
+ DriverTaskStatus.READY,
+ driverTaskHandle)));
+
+ List<DriverTask> submittedTasks = new ArrayList<>();
+ for (DriverTask task : tasks) {
+ Driver driver = (Driver) task.getDriver();
+ if (driver.hasDependency()) {
+ SettableFuture<?> blockedDependencyFuture =
+ tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+ blockedDependencyFuture.addListener(
+ () -> {
+ registerTaskToQueryMap(queryId, task);
+ submitTaskToReadyQueue(task);
+ },
+ MoreExecutors.directExecutor());
+ } else {
+ submittedTasks.add(task);
+ }
+ }
+
+ for (DriverTask task : submittedTasks) {
+ registerTaskToQueryMap(queryId, task);
+ }
+ for (DriverTask task : submittedTasks) {
+ submitTaskToReadyQueue(task);
+ }
+ }
+
+ public void registerTaskToQueryMap(QueryId queryId, DriverTask driverTask) {
// If query has not been registered by other fragment instances,
// add the first task as timeout checking task to timeoutQueue.
- for (DriverTask driverTask : tasks) {
- queryMap
- .computeIfAbsent(
- queryId,
- v -> {
- timeoutQueue.push(tasks.get(0));
- return new ConcurrentHashMap<>();
- })
- .computeIfAbsent(
- driverTask.getDriverTaskId().getFragmentInstanceId(),
- v -> Collections.synchronizedSet(new HashSet<>()))
- .add(driverTask);
- }
+ queryMap
+ .computeIfAbsent(
+ queryId,
+ v -> {
+ timeoutQueue.push(driverTask);
+ return new ConcurrentHashMap<>();
+ })
+ .computeIfAbsent(
+ driverTask.getDriverTaskId().getFragmentInstanceId(),
+ v -> Collections.synchronizedSet(new HashSet<>()))
+ .add(driverTask);
+ }
- for (DriverTask task : tasks) {
- task.lock();
- try {
- if (task.getStatus() != DriverTaskStatus.READY) {
- continue;
- }
- readyQueue.push(task);
- task.setLastEnterReadyQueueTime(System.nanoTime());
- } finally {
- task.unlock();
+ public void submitTaskToReadyQueue(DriverTask task) {
+ task.lock();
+ try {
+ if (task.getStatus() != DriverTaskStatus.READY) {
+ return;
}
+ readyQueue.push(task);
+ task.setLastEnterReadyQueueTime(System.nanoTime());
+ } finally {
+ task.unlock();
}
}
@@ -448,6 +474,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.FINISHED);
clearDriverTask(task);
+ task.submitDependencyDriver();
} finally {
task.unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index e008b84210..b4c501a158 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTa
import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import java.util.Comparator;
@@ -58,6 +59,8 @@ public class DriverTask implements IDIndexedAccessible {
private long lastEnterReadyQueueTime;
private long lastEnterBlockQueueTime;
+ private SettableFuture<?> blockedDependencyDriver = SettableFuture.create();
+
/** Initialize a dummy instance for queryHolder */
public DriverTask() {
this(new StubFragmentInstance(), 0L, null, null);
@@ -137,6 +140,14 @@ public class DriverTask implements IDIndexedAccessible {
this.abortCause = abortCause;
}
+ public void submitDependencyDriver() {
+ this.blockedDependencyDriver.set(null);
+ }
+
+ public SettableFuture<?> getBlockedDependencyDriver() {
+ return blockedDependencyDriver;
+ }
+
public Priority getPriority() {
return priority.get();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 32988da5ca..3132783e3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -98,6 +98,7 @@ public class LocalExecutionPlanContext {
this.dataRegionTTL = parentContext.dataRegionTTL;
this.nextPipelineId = parentContext.nextPipelineId;
this.pipelineDriverFactories = parentContext.pipelineDriverFactories;
+ this.degreeOfParallelism = parentContext.degreeOfParallelism;
this.exchangeSumNum = parentContext.exchangeSumNum;
this.exchangeOperatorList = parentContext.exchangeOperatorList;
this.cachedDataTypes = parentContext.cachedDataTypes;
@@ -139,6 +140,10 @@ public class LocalExecutionPlanContext {
return pipelineDriverFactories;
}
+ public int getPipelineNumber() {
+ return nextPipelineId.get();
+ }
+
public DriverContext getDriverContext() {
return driverContext;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index ad2a5163ce..fd54e09b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2303,16 +2303,84 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
private List<Operator> dealWithConsumeChildrenOneByOneNode(
PlanNode node, LocalExecutionPlanContext context) {
+ List<Operator> parentPipelineChildren = new ArrayList<>();
int originExchangeNum = context.getExchangeSumNum();
int finalExchangeNum = context.getExchangeSumNum();
- List<Operator> children = new ArrayList<>();
- for (PlanNode childSource : node.getChildren()) {
- Operator childOperation = childSource.accept(this, context);
- finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
- context.setExchangeSumNum(originExchangeNum);
- children.add(childOperation);
+
+ // 1. divide every child to pipeline using the max dop
+ if (context.getDegreeOfParallelism() == 1) {
+ // If dop = 1, we don't create extra pipeline
+ for (PlanNode childSource : node.getChildren()) {
+ Operator childOperation = childSource.accept(this, context);
+ finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
+ context.setExchangeSumNum(originExchangeNum);
+ parentPipelineChildren.add(childOperation);
+ }
+ } else {
+ List<Integer> childPipelineNums = new ArrayList<>();
+ int sumOfChildPipelines = 0;
+ int dependencyChildNode = 0, dependencyPipeId = 0;
+ for (PlanNode childNode : node.getChildren()) {
+ if (childNode instanceof ExchangeNode) {
+ Operator childOperation = childNode.accept(this, context);
+ finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
+ context.setExchangeSumNum(originExchangeNum);
+ parentPipelineChildren.add(childOperation);
+ } else {
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ // Only context.getDegreeOfParallelism() - 1 can be allocated to child
+ int dopForChild = context.getDegreeOfParallelism() - 1;
+ subContext.setDegreeOfParallelism(dopForChild);
+ int originPipeNum = context.getPipelineNumber();
+ Operator childOperation = childNode.accept(this, subContext);
+ ISinkHandle localSinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+ // Attention, there is no parent node, use first child node instead
+ context.getDriverContext(), childNode.getPlanNodeId().getId());
+ subContext.setSinkHandle(localSinkHandle);
+ subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+
+ int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum;
+ childPipelineNums.add(curChildPipelineNum);
+ sumOfChildPipelines += curChildPipelineNum;
+ // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish
+ if (sumOfChildPipelines > dopForChild) {
+ // Update dependencyPipeId, after which finishes we can submit curChildPipeline
+ while (sumOfChildPipelines > dopForChild) {
+ dependencyPipeId = context.getPipelineNumber() - sumOfChildPipelines;
+ sumOfChildPipelines -= childPipelineNums.get(dependencyChildNode);
+ dependencyChildNode++;
+ }
+ }
+ // Add dependency for all pipelines under current node
+ if (dependencyChildNode != 0) {
+ for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) {
+ context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+ }
+ }
+
+ ExchangeOperator sourceOperator =
+ new ExchangeOperator(
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ null,
+ ExchangeOperator.class.getSimpleName()),
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+ ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+ context.getDriverContext()),
+ childNode.getPlanNodeId());
+ context
+ .getTimeSliceAllocator()
+ .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+ parentPipelineChildren.add(sourceOperator);
+ context.addExchangeOperator(sourceOperator);
+ finalExchangeNum = Math.max(finalExchangeNum, subContext.getExchangeSumNum() + 1);
+ }
+ }
}
context.setExchangeSumNum(finalExchangeNum);
- return children;
+ return parentPipelineChildren;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index ec5eb7a3e2..fc21ed5c59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -31,6 +31,7 @@ public class PipelineDriverFactory {
private final DriverContext driverContext;
// TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
private final Operator operation;
+ private int dependencyPipelineIndex = -1;
public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
this.operation = requireNonNull(operation, "rootOperator is null");
@@ -44,7 +45,11 @@ public class PipelineDriverFactory {
public Driver createDriver() {
requireNonNull(driverContext, "driverContext is null");
try {
- return new DataDriver(operation, driverContext);
+ Driver driver = new DataDriver(operation, driverContext);
+ if (dependencyPipelineIndex != -1) {
+ driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
+ }
+ return driver;
} catch (Throwable failure) {
try {
operation.close();
@@ -56,4 +61,8 @@ public class PipelineDriverFactory {
throw failure;
}
}
+
+ public void setDependencyPipeline(int dependencyPipelineIndex) {
+ this.dependencyPipelineIndex = dependencyPipelineIndex;
+ }
}