You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:51 UTC

[iotdb] 01/13: implement consumeAllChildren pipeline divided by dop

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8051908a265c74c351d1c6d7b9da6fdc1008248b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 8 11:29:18 2023 +0800

    implement consumeAllChildren pipeline divided by dop
---
 .../plan/planner/LocalExecutionPlanContext.java    |  9 +++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 79 ++++++++++++++++++----
 .../db/mpp/plan/planner/plan/node/PlanNode.java    | 13 ++++
 .../planner/plan/node/process/AggregationNode.java | 16 ++++-
 .../planner/plan/node/process/DeviceMergeNode.java |  9 +++
 .../plan/node/process/GroupByLevelNode.java        | 10 +++
 .../planner/plan/node/process/GroupByTagNode.java  | 15 +++-
 .../planner/plan/node/process/MergeSortNode.java   |  9 +++
 .../planner/plan/node/process/TimeJoinNode.java    |  8 +++
 .../plan/node/process/VerticallyConcatNode.java    |  6 ++
 10 files changed, 156 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 2d8e6d7051..32988da5ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -59,6 +59,7 @@ public class LocalExecutionPlanContext {
   private final AtomicInteger nextOperatorId;
   private final TypeProvider typeProvider;
   private final Map<String, Set<String>> allSensorsMap;
+  private int degreeOfParallelism = 4;
   // this is shared with all subContexts
   private AtomicInteger nextPipelineId;
   private List<PipelineDriverFactory> pipelineDriverFactories;
@@ -142,6 +143,14 @@ public class LocalExecutionPlanContext {
     return driverContext;
   }
 
+  public int getDegreeOfParallelism() {
+    return degreeOfParallelism;
+  }
+
+  public void setDegreeOfParallelism(int degreeOfParallelism) {
+    this.degreeOfParallelism = degreeOfParallelism;
+  }
+
   private int getNextPipelineId() {
     return nextPipelineId.getAndIncrement();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index de21989478..d397db9980 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -216,7 +216,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -2211,22 +2213,71 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
   private List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
       PlanNode node, LocalExecutionPlanContext context) {
     // children after pipelining
-    List<Operator> children = new ArrayList<>();
+    LinkedList<Operator> parentPipelineChildren = new LinkedList<>();
     int finalExchangeNum = context.getExchangeSumNum();
-    for (PlanNode childSource : node.getChildren()) {
-      // Create pipelines for children
-      LocalExecutionPlanContext subContext = context.createSubContext();
-      Operator childOperation = childSource.accept(this, subContext);
-      // If the child belongs to another fragment instance, we don't create pipeline for it
-      if (childOperation instanceof ExchangeOperator) {
-        children.add(childOperation);
+    // 1. exclude ExchangeOperator first
+    Iterator<PlanNode> childrenIterator = node.getChildren().listIterator();
+    while (childrenIterator.hasNext()) {
+      PlanNode childSource = childrenIterator.next();
+      if (childSource instanceof ExchangeNode) {
+        Operator childOperation = childSource.accept(this, context);
         finalExchangeNum += 1;
-      } else {
+        parentPipelineChildren.add(childOperation);
+        // Remove exchangeNode directly for later use
+        childrenIterator.remove();
+      }
+    }
+    List<PlanNode> localChildren = node.getChildren();
+    if (context.getDegreeOfParallelism() == 1) {
+      // If dop = 1, we don't create extra pipeline
+      for (PlanNode localChild : localChildren) {
+        Operator childOperation = localChild.accept(this, context);
+        parentPipelineChildren.add(childOperation);
+      }
+    } else {
+      // 2. divide every childNumInEachPipeline localChildren to different pipeline
+      int childNumInEachPipeline =
+          Math.max(1, localChildren.size() / context.getDegreeOfParallelism());
+      // If dop > size(children) + 1, we can allocate extra dop to child node
+      // Extra dop = dop - size(children), since dop = 1 means serial but not 0
+      int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1);
+      int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size());
+
+      for (int i = 0; i < maxDop; i++) {
+        // Only if dop >= size(children) + 1, split all children to new pipeline
+        // Otherwise, the first group but not last will belong to the parent pipeline since the
+        // children number of last group is greaterEqual than the first group
+        if (i == 0 && context.getDegreeOfParallelism() < localChildren.size() + 1) {
+          for (int j = 0; j < childNumInEachPipeline; j++) {
+            Operator childOperation = localChildren.get(j).accept(this, context);
+            parentPipelineChildren.addFirst(childOperation);
+          }
+          continue;
+        }
+        LocalExecutionPlanContext subContext = context.createSubContext();
+        subContext.setDegreeOfParallelism(dopForChild);
+        // Create partial parent operator for children
+        PlanNode partialParentNode = null;
+        Operator partialParentOperator = null;
+        if (childNumInEachPipeline == 1) {
+          partialParentNode = localChildren.get(i);
+          partialParentOperator = localChildren.get(i).accept(this, subContext);
+        } else {
+          // PartialParentNode is equals to parentNode except children
+          int startIndex = i * childNumInEachPipeline,
+              endIndex = i < (maxDop - 1) ? (i + 1) * childNumInEachPipeline : localChildren.size();
+          partialParentNode = node.createSubNode(i, startIndex, endIndex);
+          for (int j = startIndex; j < endIndex; j++) {
+            partialParentNode.addChild(localChildren.get(i));
+          }
+          partialParentOperator = partialParentNode.accept(this, context);
+        }
         ISinkHandle localSinkHandle =
             MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
-                subContext.getDriverContext(), childSource.getPlanNodeId().getId());
+                // Attention, there is no parent node, use first child node instead
+                subContext.getDriverContext(), localChildren.get(i).getPlanNodeId().getId());
         subContext.setSinkHandle(localSinkHandle);
-        subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+        subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext());
 
         ExchangeOperator sourceOperator =
             new ExchangeOperator(
@@ -2237,17 +2288,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
                     ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
                     context.getDriverContext()),
-                childSource.getPlanNodeId());
+                partialParentNode.getPlanNodeId());
         context
             .getTimeSliceAllocator()
             .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
-        children.add(sourceOperator);
+        parentPipelineChildren.add(sourceOperator);
         context.addExchangeOperator(sourceOperator);
         finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
       }
     }
     context.setExchangeSumNum(finalExchangeNum);
-    return children;
+    return parentPipelineChildren;
   }
 
   private List<Operator> dealWithConsumeChildrenOneByOneNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index 80ec99d1f3..ae66ddc843 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -65,6 +65,19 @@ public abstract class PlanNode implements IConsensusRequest {
   @Override
   public abstract PlanNode clone();
 
+  /**
+   * Create sub node which has exactly the same function of origin node, only its children is a part
+   * of it, which is composed by the [startIndex, endIndex) of origin children list.
+   *
+   * @param subNodeId the sub node id
+   * @param startIndex the start Index of origin children
+   * @param endIndex the endIndex Index of origin children
+   */
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    throw new UnsupportedOperationException(
+        String.format("Can't Create subNode for %s", this.getClass().toString()));
+  }
+
   public PlanNode cloneWithChildren(List<PlanNode> children) {
     if (!(children == null
         || allowedChildCount() == CHILD_COUNT_NO_LIMIT
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 6c0c7703e8..69b8eb02db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -50,8 +50,7 @@ import java.util.stream.Collectors;
 public class AggregationNode extends MultiChildProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one or two column
-  // of
-  // result TsBlock
+  // of result TsBlock
   protected List<AggregationDescriptor> aggregationDescriptorList;
 
   // The parameter of `group by time`.
@@ -167,6 +166,19 @@ public class AggregationNode extends MultiChildProcessNode {
         getScanOrder());
   }
 
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new AggregationNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        children.subList(startIndex, endIndex),
+        // TODO figure out the relation of aggregationDescriptorList and children node
+        getAggregationDescriptorList().subList(startIndex, endIndex),
+        getGroupByTimeParameter(),
+        getGroupByParameter(),
+        getGroupByExpression(),
+        outputEndTime,
+        getScanOrder());
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     List<String> outputColumnNames = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index d3ec660e7f..c56d965c49 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -63,6 +63,15 @@ public class DeviceMergeNode extends MultiChildProcessNode {
     return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new DeviceMergeNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        getMergeOrderParameter(),
+        // TODO figure out the relation of devices and children node
+        devices.subList(startIndex, endIndex));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return children.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index da7b8dc9c0..8959eae92b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -93,6 +93,16 @@ public class GroupByLevelNode extends MultiChildProcessNode {
         getPlanNodeId(), getGroupByLevelDescriptors(), this.groupByTimeParameter, this.scanOrder);
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new GroupByLevelNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        // TODO figure out the relation of aggregationDescriptorList and children node
+        this.groupByLevelDescriptors.subList(startIndex, endIndex),
+        this.groupByTimeParameter,
+        this.scanOrder);
+  }
+
   public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
     return groupByLevelDescriptors;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index c2e558bdc7..87580783f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -28,9 +28,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.lang3.Validate;
-
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.Validate;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -99,6 +98,18 @@ public class GroupByTagNode extends MultiChildProcessNode {
         this.outputColumnNames);
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new GroupByTagNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        this.groupByTimeParameter,
+        this.scanOrder,
+        // TODO figure out the relation of aggregationDescriptorList and children node
+        this.tagKeys,
+        this.tagValuesToAggregationDescriptors,
+        this.outputColumnNames);
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     List<String> ret = new ArrayList<>(tagKeys);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
index 564a5dbfa4..6c69a70722 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -54,6 +54,15 @@ public class MergeSortNode extends MultiChildProcessNode {
     return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(), outputColumns);
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new MergeSortNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        getMergeOrderParameter(),
+        // TODO figure out the relation of outputColumns and children node
+        outputColumns.subList(startIndex, endIndex));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return outputColumns;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 2598cd4e28..2e89b841d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -62,6 +62,14 @@ public class TimeJoinNode extends MultiChildProcessNode {
     return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new TimeJoinNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        getMergeOrder(),
+        children.subList(startIndex, endIndex));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return children.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
index 24760a4228..ab6ef28fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
@@ -47,6 +47,12 @@ public class VerticallyConcatNode extends MultiChildProcessNode {
     return new VerticallyConcatNode(getPlanNodeId());
   }
 
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new VerticallyConcatNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)));
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return children.stream()