You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:01 UTC
[iotdb] 11/13: Add consumeOneByOne unit tests
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5f207d6febb655bafa3e0f8e0063e00d518c5285
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Feb 10 17:42:59 2023 +0800
Add consumeOneByOne unit tests
---
.../db/mpp/plan/planner/PipelineDriverFactory.java | 4 +
.../db/mpp/plan/plan/PipelineBuilderTest.java | 376 ++++++++++++++++++++-
2 files changed, 379 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index e71107b7af..8cc9ec0e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -73,4 +73,8 @@ public class PipelineDriverFactory {
public void setDependencyPipeline(int dependencyPipelineIndex) {
this.dependencyPipelineIndex = dependencyPipelineIndex;
}
+
+ public int getDependencyPipelineIndex() {
+ return dependencyPipelineIndex;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index 2f487ec69d..29c84d10e0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.plan;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -29,13 +30,16 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -226,7 +230,7 @@ public class PipelineBuilderTest {
}
/**
- * This test will test dop = 4. Expected result is five pipelines:
+ * This test will test dop = 5. Expected result is five pipelines:
*
* <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
* ExchangeOperator];
@@ -283,6 +287,356 @@ public class PipelineBuilderTest {
assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
}
+ /**
+ * This test will test dop = 6. Expected result is still five pipelines:
+ *
+ * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeAllChildrenPipelineBuilder6() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(6);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the changes of node structure
+ assertEquals(4, timeJoinNode.getChildren().size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+ assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+ assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+ assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+ assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+ }
+
+ /**
+ * The operator structure is [DeviceView - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+ *
+ * <p>The next six tests, I will test this DeviceViewOperator with different dop.
+ *
+ * <p>The first test will test dop = 1. Expected result is that no child pipelines will be
+ * divided.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder1() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(1);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+ assertEquals(0, context.getPipelineNumber());
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(AlignedSeriesScanOperator.class, childrenOperator.get(i).getClass());
+ assertEquals(
+ String.format("root.sg.d%d.s1", i),
+ deviceViewNode.getChildren().get(i).getOutputColumnNames().get(0));
+ }
+ }
+
+ /**
+ * This test will test dop = 2. Expected result is five pipelines with dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1, which has dependency second pipeline.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency third pipeline.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency forth pipeline.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder2() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(2);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+ assertEquals(0, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+ assertEquals(1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+ assertEquals(2, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+ }
+
+ /**
+ * This test will test dop = 3. Expected result is five pipelines with dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2, which has dependency second pipeline.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency third pipeline.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder3() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(3);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+ assertEquals(0, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+ assertEquals(1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+ }
+
+ /**
+ * This test will test dop = 4. Expected result is five pipelines with dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3, which has dependency second pipeline.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder4() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(4);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+ assertEquals(0, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+ }
+
+ /**
+ * This test will test dop = 5. Expected result is five pipelines without dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder5() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(5);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+ }
+
+ /**
+ * This test will test dop = 5. Expected result is five pipelines without dependency:
+ *
+ * <p>The first is: DeviceView - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+ * ExchangeOperator];
+ *
+ * <p>The second is: ExchangeOperator - SeriesScan0.
+ *
+ * <p>The third is: ExchangeOperator - SeriesScan1.
+ *
+ * <p>The forth is: ExchangeOperator - SeriesScan2.
+ *
+ * <p>The fifth is: ExchangeOperator - SeriesScan3.
+ */
+ @Test
+ public void testConsumeOneByOneChildrenPipelineBuilder6() throws IllegalPathException {
+ TypeProvider typeProvider = new TypeProvider();
+ DeviceViewNode deviceViewNode = initDeviceViewNode(typeProvider, 4);
+ LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+ context.setDegreeOfParallelism(5);
+
+ List<Operator> childrenOperator =
+ operatorTreeGenerator.dealWithConsumeChildrenOneByOneNode(deviceViewNode, context);
+ // The number of pipeline is 4, since parent pipeline hasn't joined
+ assertEquals(4, context.getPipelineNumber());
+
+ // Validate the first pipeline
+ assertEquals(4, childrenOperator.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+ }
+
+ // Validate the second pipeline
+ ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+ assertEquals("AlignedSeriesScanNode0", exchangeOperator1.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(0).getDependencyPipelineIndex());
+
+ // Validate the third pipeline
+ ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+ assertEquals("AlignedSeriesScanNode1", exchangeOperator2.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(1).getDependencyPipelineIndex());
+
+ // Validate the forth pipeline
+ ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+ assertEquals("AlignedSeriesScanNode2", exchangeOperator3.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(2).getDependencyPipelineIndex());
+
+ // Validate the fifth pipeline
+ ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+ assertEquals("AlignedSeriesScanNode3", exchangeOperator4.getSourceId().getId());
+ assertEquals(-1, context.getPipelineDriverFactories().get(3).getDependencyPipelineIndex());
+ }
+
private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
@@ -319,4 +673,24 @@ public class PipelineBuilderTest {
}
return timeJoinNode;
}
+
+ /**
+ * This method will init a DeviceViewNode with @childNum alignedSeriesScanNode as children.
+ *
+ * @param childNum the number of children
+ * @return a DeviceViewNode with @childNum alignedSeriesScanNode as children
+ */
+ private DeviceViewNode initDeviceViewNode(TypeProvider typeProvider, int childNum)
+ throws IllegalPathException {
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(new PlanNodeId("DeviceViewNode"), null, null, null);
+ for (int i = 0; i < childNum; i++) {
+ AlignedSeriesScanNode alignedSeriesScanNode =
+ new AlignedSeriesScanNode(
+ new PlanNodeId(String.format("AlignedSeriesScanNode%d", i)),
+ new AlignedPath(String.format("root.sg.d%d", i), "s1"));
+ deviceViewNode.addChild(alignedSeriesScanNode);
+ }
+ return deviceViewNode;
+ }
}