You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:01 UTC

[iotdb] 11/13: Add consumeOneByOne unit tests

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f207d6febb655bafa3e0f8e0063e00d518c5285
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Feb 10 17:42:59 2023 +0800

    Add consumeOneByOne unit tests
---
 .../db/mpp/plan/planner/PipelineDriverFactory.java |   4 +
 .../db/mpp/plan/plan/PipelineBuilderTest.java      | 376 ++++++++++++++++++++-
 2 files changed, 379 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index e71107b7af..8cc9ec0e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -73,4 +73,8 @@ public class PipelineDriverFactory {
   public void setDependencyPipeline(int dependencyPipelineIndex) {
     this.dependencyPipelineIndex = dependencyPipelineIndex;
   }
+
+  public int getDependencyPipelineIndex() {
+    return dependencyPipelineIndex;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index 2f487ec69d..29c84d10e0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.plan;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -29,13 +30,16 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
 import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -226,7 +230,7 @@ public class PipelineBuilderTest {
   }
 
   /**
-   * This test will test dop = 4. Expected result is five pipelines:
+   * This test will test dop = 5. Expected result is five pipelines:
    *
    * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
    * ExchangeOperator];
@@ -283,6 +287,356 @@ public class PipelineBuilderTest {
     assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
   }
 
+  /**
+   * This test will test dop = 6. Expected result is still five pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder6() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(6);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the changes of node structure
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+  }
+
+  /**
+   * The operator structure is [DeviceView - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+   *
+   * <p>The next six tests, I will test this DeviceViewOperator with different dop.
+   *
+   * <p>The first test will test dop = 1. Expected result is that no child pipelines will be
+   * divided.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder1() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(1);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+    assertEquals(0, context.getPipelineNumber());
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(AlignedSeriesScanOperator.class, childrenOperator.get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          deviceViewNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+  }
+
+  /**
+   * This test will test dop = 2. Expected result is five pipelines with dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency second pipeline.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency third pipeline.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency forth pipeline.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder2() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(2);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+    assertEquals(0, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+    assertEquals(1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+    assertEquals(2, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+  }
+
+  /**
+   * This test will test dop = 3. Expected result is five pipelines with dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency second pipeline.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency third pipeline.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder3() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(3);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+    assertEquals(0, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+    assertEquals(1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+  }
+
+  /**
+   * This test will test dop = 4. Expected result is five pipelines with dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency second pipeline.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder4() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(4);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+    assertEquals(0, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+  }
+
+  /**
+   * This test will test dop = 5. Expected result is five pipelines without dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder5() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(5);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+  }
+
+  /**
+   * This test will test dop = 5. Expected result is five pipelines without dependency:
+   *
+   * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeOneByOneChildrenPipelineBuilder6() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(5);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+    assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+  }
+
   private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
@@ -319,4 +673,24 @@ public class PipelineBuilderTest {
     }
     return timeJoinNode;
   }
+
+  /**
+   * This method will init a DeviceViewNode with @childNum alignedSeriesScanNode as children.
+   *
+   * @param childNum the number of children
+   * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children
+   */
+  private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int childNum)
+      throws IllegalPathException {
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
+    for (int i = 0; i < childNum; i++) {
+      AlignedSeriesScanNode alignedSeriesScanNode =
+          new AlignedSeriesScanNode(
+              new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)),
+              new AlignedPath(String.format("root.sg.d%d", i), "s1"));
+      deviceViewNode.addChild(alignedSeriesScanNode);
+    }
+    return deviceViewNode;
+  }
 }