You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:54 UTC

[iotdb] 04/13: implement consumeOneByOneChildren pipeline divided by dop

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 44dbfc316935443432b874ee05c091f8d603704f
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 9 19:35:51 2023 +0800

    implement consumeOneByOneChildren pipeline divided by dop
---
 .../iotdb/db/mpp/execution/driver/Driver.java      | 15 +++-
 .../db/mpp/execution/driver/DriverContext.java     |  9 ++
 .../db/mpp/execution/schedule/DriverScheduler.java | 95 ++++++++++++++--------
 .../db/mpp/execution/schedule/task/DriverTask.java | 11 +++
 .../plan/planner/LocalExecutionPlanContext.java    |  5 ++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 82 +++++++++++++++++--
 .../db/mpp/plan/planner/PipelineDriverFactory.java | 11 ++-
 7 files changed, 184 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 14554bb8fb..a1df81be83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -29,11 +29,10 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
+import javax.annotation.concurrent.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -91,6 +90,10 @@ public abstract class Driver implements IDriver {
     return result.orElseGet(() -> state.get() != State.ALIVE || driverContext.isDone());
   }
 
+  public DriverContext getDriverContext() {
+    return driverContext;
+  }
+
   /**
    * do initialization
    *
@@ -101,6 +104,14 @@ public abstract class Driver implements IDriver {
   /** release resource this driver used */
   protected abstract void releaseResource();
 
+  public boolean hasDependency() {
+    return driverContext.getDependencyDriverIndex() != -1;
+  }
+
+  public int getDependencyDriverIndex() {
+    return driverContext.getDependencyDriverIndex();
+  }
+
   @Override
   public ListenableFuture<?> processFor(Duration duration) {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 95a5861978..ea369d8fad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -39,6 +39,7 @@ public class DriverContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISinkHandle sinkHandle;
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+  private int dependencyDriverIndex = -1;
 
   private final AtomicBoolean finished = new AtomicBoolean();
 
@@ -69,6 +70,14 @@ public class DriverContext {
     throw new UnsupportedOperationException();
   }
 
+  public void setDependencyDriverIndex(int dependencyDriverIndex) {
+    this.dependencyDriverIndex = dependencyDriverIndex;
+  }
+
+  public int getDependencyDriverIndex() {
+    return dependencyDriverIndex;
+  }
+
   public void setSinkHandle(ISinkHandle sinkHandle) {
     this.sinkHandle = sinkHandle;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 7188717d20..f959007648 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.Driver;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -39,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +55,6 @@ import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -175,43 +177,67 @@ public class DriverScheduler implements IDriverScheduler, IService {
             getNextDriverTaskHandleId(),
             (MultilevelPriorityQueue) readyQueue,
             OptionalInt.of(Integer.MAX_VALUE));
-    List<DriverTask> tasks =
-        drivers.stream()
-            .map(
-                v ->
-                    new DriverTask(
-                        v,
-                        timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
-                        DriverTaskStatus.READY,
-                        driverTaskHandle))
-            .collect(Collectors.toList());
+    List<DriverTask> tasks = new ArrayList<>();
+    drivers.forEach(
+        driver ->
+            tasks.add(
+                new DriverTask(
+                    driver,
+                    timeOut > 0 ? timeOut : QUERY_TIMEOUT_MS,
+                    DriverTaskStatus.READY,
+                    driverTaskHandle)));
+
+    List<DriverTask> submittedTasks = new ArrayList<>();
+    for (DriverTask task : tasks) {
+      Driver driver = (Driver) task.getDriver();
+      if (driver.hasDependency()) {
+        SettableFuture<?> blockedDependencyFuture =
+            tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+        blockedDependencyFuture.addListener(
+            () -> {
+              registerTaskToQueryMap(queryId, task);
+              submitTaskToReadyQueue(task);
+            },
+            MoreExecutors.directExecutor());
+      } else {
+        submittedTasks.add(task);
+      }
+    }
+
+    for (DriverTask task : submittedTasks) {
+      registerTaskToQueryMap(queryId, task);
+    }
+    for (DriverTask task : submittedTasks) {
+      submitTaskToReadyQueue(task);
+    }
+  }
+
+  public void registerTaskToQueryMap(QueryId queryId, DriverTask driverTask) {
     // If query has not been registered by other fragment instances,
     // add the first task as timeout checking task to timeoutQueue.
-    for (DriverTask driverTask : tasks) {
-      queryMap
-          .computeIfAbsent(
-              queryId,
-              v -> {
-                timeoutQueue.push(tasks.get(0));
-                return new ConcurrentHashMap<>();
-              })
-          .computeIfAbsent(
-              driverTask.getDriverTaskId().getFragmentInstanceId(),
-              v -> Collections.synchronizedSet(new HashSet<>()))
-          .add(driverTask);
-    }
+    queryMap
+        .computeIfAbsent(
+            queryId,
+            v -> {
+              timeoutQueue.push(driverTask);
+              return new ConcurrentHashMap<>();
+            })
+        .computeIfAbsent(
+            driverTask.getDriverTaskId().getFragmentInstanceId(),
+            v -> Collections.synchronizedSet(new HashSet<>()))
+        .add(driverTask);
+  }
 
-    for (DriverTask task : tasks) {
-      task.lock();
-      try {
-        if (task.getStatus() != DriverTaskStatus.READY) {
-          continue;
-        }
-        readyQueue.push(task);
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-      } finally {
-        task.unlock();
+  public void submitTaskToReadyQueue(DriverTask task) {
+    task.lock();
+    try {
+      if (task.getStatus() != DriverTaskStatus.READY) {
+        return;
       }
+      readyQueue.push(task);
+      task.setLastEnterReadyQueueTime(System.nanoTime());
+    } finally {
+      task.unlock();
     }
   }
 
@@ -448,6 +474,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.FINISHED);
         clearDriverTask(task);
+        task.submitDependencyDriver();
       } finally {
         task.unlock();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index e008b84210..b4c501a158 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.DriverTa
 import org.apache.iotdb.db.mpp.execution.schedule.queue.multilevelqueue.Priority;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
 
 import java.util.Comparator;
@@ -58,6 +59,8 @@ public class DriverTask implements IDIndexedAccessible {
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
+  private SettableFuture<?> blockedDependencyDriver = SettableFuture.create();
+
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null, null);
@@ -137,6 +140,14 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
+  public void submitDependencyDriver() {
+    this.blockedDependencyDriver.set(null);
+  }
+
+  public SettableFuture<?> getBlockedDependencyDriver() {
+    return blockedDependencyDriver;
+  }
+
   public Priority getPriority() {
     return priority.get();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 32988da5ca..3132783e3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -98,6 +98,7 @@ public class LocalExecutionPlanContext {
     this.dataRegionTTL = parentContext.dataRegionTTL;
     this.nextPipelineId = parentContext.nextPipelineId;
     this.pipelineDriverFactories = parentContext.pipelineDriverFactories;
+    this.degreeOfParallelism = parentContext.degreeOfParallelism;
     this.exchangeSumNum = parentContext.exchangeSumNum;
     this.exchangeOperatorList = parentContext.exchangeOperatorList;
     this.cachedDataTypes = parentContext.cachedDataTypes;
@@ -139,6 +140,10 @@ public class LocalExecutionPlanContext {
     return pipelineDriverFactories;
   }
 
+  public int getPipelineNumber() {
+    return nextPipelineId.get();
+  }
+
   public DriverContext getDriverContext() {
     return driverContext;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index ad2a5163ce..fd54e09b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2303,16 +2303,84 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
   private List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
+    List<Operator> parentPipelineChildren = new ArrayList<>();
     int originExchangeNum = context.getExchangeSumNum();
     int finalExchangeNum = context.getExchangeSumNum();
-    List<Operator> children = new ArrayList<>();
-    for (PlanNode childSource : node.getChildren()) {
-      Operator childOperation = childSource.accept(this, context);
-      finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
-      context.setExchangeSumNum(originExchangeNum);
-      children.add(childOperation);
+
+    // 1. divide every child to pipeline using the max dop
+    if (context.getDegreeOfParallelism() == 1) {
+      // If dop = 1, we don't create extra pipeline
+      for (PlanNode childSource : node.getChildren()) {
+        Operator childOperation = childSource.accept(this, context);
+        finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
+        context.setExchangeSumNum(originExchangeNum);
+        parentPipelineChildren.add(childOperation);
+      }
+    } else {
+      List<Integer> childPipelineNums = new ArrayList<>();
+      int sumOfChildPipelines = 0;
+      int dependencyChildNode = 0, dependencyPipeId = 0;
+      for (PlanNode childNode : node.getChildren()) {
+        if (childNode instanceof ExchangeNode) {
+          Operator childOperation = childNode.accept(this, context);
+          finalExchangeNum = Math.max(finalExchangeNum, context.getExchangeSumNum());
+          context.setExchangeSumNum(originExchangeNum);
+          parentPipelineChildren.add(childOperation);
+        } else {
+          LocalExecutionPlanContext subContext = context.createSubContext();
+          // Only context.getDegreeOfParallelism() - 1 can be allocated to child
+          int dopForChild = context.getDegreeOfParallelism() - 1;
+          subContext.setDegreeOfParallelism(dopForChild);
+          int originPipeNum = context.getPipelineNumber();
+          Operator childOperation = childNode.accept(this, subContext);
+          ISinkHandle localSinkHandle =
+              MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+                  // Attention, there is no parent node, use first child node instead
+                  context.getDriverContext(), childNode.getPlanNodeId().getId());
+          subContext.setSinkHandle(localSinkHandle);
+          subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+
+          int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum;
+          childPipelineNums.add(curChildPipelineNum);
+          sumOfChildPipelines += curChildPipelineNum;
+          // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish
+          if (sumOfChildPipelines > dopForChild) {
+            // Update dependencyPipeId, after which finishes we can submit curChildPipeline
+            while (sumOfChildPipelines > dopForChild) {
+              dependencyPipeId = context.getPipelineNumber() - sumOfChildPipelines;
+              sumOfChildPipelines -= childPipelineNums.get(dependencyChildNode);
+              dependencyChildNode++;
+            }
+          }
+          // Add dependency for all pipelines under current node
+          if (dependencyChildNode != 0) {
+            for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) {
+              context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+            }
+          }
+
+          ExchangeOperator sourceOperator =
+              new ExchangeOperator(
+                  context
+                      .getDriverContext()
+                      .addOperatorContext(
+                          context.getNextOperatorId(),
+                          null,
+                          ExchangeOperator.class.getSimpleName()),
+                  MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+                      ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+                      context.getDriverContext()),
+                  childNode.getPlanNodeId());
+          context
+              .getTimeSliceAllocator()
+              .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+          parentPipelineChildren.add(sourceOperator);
+          context.addExchangeOperator(sourceOperator);
+          finalExchangeNum = Math.max(finalExchangeNum, subContext.getExchangeSumNum() + 1);
+        }
+      }
     }
     context.setExchangeSumNum(finalExchangeNum);
-    return children;
+    return parentPipelineChildren;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index ec5eb7a3e2..fc21ed5c59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -31,6 +31,7 @@ public class PipelineDriverFactory {
   private final DriverContext driverContext;
   // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
   private final Operator operation;
+  private int dependencyPipelineIndex = -1;
 
   public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
     this.operation = requireNonNull(operation, "rootOperator is null");
@@ -44,7 +45,11 @@ public class PipelineDriverFactory {
   public Driver createDriver() {
     requireNonNull(driverContext, "driverContext is null");
     try {
-      return new DataDriver(operation, driverContext);
+      Driver driver = new DataDriver(operation, driverContext);
+      if (dependencyPipelineIndex != -1) {
+        driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
+      }
+      return driver;
     } catch (Throwable failure) {
       try {
         operation.close();
@@ -56,4 +61,8 @@ public class PipelineDriverFactory {
       throw failure;
     }
   }
+
+  public void setDependencyPipeline(int dependencyPipelineIndex) {
+    this.dependencyPipelineIndex = dependencyPipelineIndex;
+  }
 }