You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:00 UTC

[iotdb] 10/13: Add consumeAll unit tests

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 082fd7d0b7d49e87fc5cc75a76f5695efa817a05
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Feb 10 17:00:13 2023 +0800

    Add consumeAll unit tests
---
 .../process/join/RowBasedTimeJoinOperator.java     |   6 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   4 +-
 .../db/mpp/plan/plan/PipelineBuilderTest.java      | 322 +++++++++++++++++++++
 3 files changed, 330 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 50cccb9573..79f9905ba8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AbstractProcessOperator;
@@ -290,6 +291,11 @@ public class RowBasedTimeJoinOperator extends AbstractProcessOperator {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
 
+  @TestOnly
+  public List<Operator> getChildren() {
+    return children;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 21156e3f82..dff2e57965 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2211,7 +2211,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             node.getPathPatternList(), node.getTemplateId()));
   }
 
-  private List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
+  public List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
       PlanNode node, LocalExecutionPlanContext context) {
     // children after pipelining
     LinkedList<Operator> parentPipelineChildren = new LinkedList<>();
@@ -2325,7 +2325,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return childNumInEachPipeline;
   }
 
-  private List<Operator> dealWithConsumeChildrenOneByOneNode(
+  public List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
     List<Operator> parentPipelineChildren = new ArrayList<>();
     int originExchangeNum = context.getExchangeSumNum();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
new file mode 100644
index 0000000000..2f487ec69d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.mpp.plan.planner.OperatorTreeGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+
+public class PipelineBuilderTest {
+
+  OperatorTreeGenerator operatorTreeGenerator = new OperatorTreeGenerator();
+
+  /**
+   * The operator structure is [TimeJoin1 - [SeriesScan0,SeriesScan1,SeriesScan2,SeriesScan3]].
+   *
+   * <p>The next six tests, I will test this TimeJoinOperator with different dop.
+   *
+   * <p>The first test will test dop = 1. Expected result is that no child pipelines will be
+   * divided.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder1() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(1);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+    assertEquals(0, context.getPipelineNumber());
+    assertEquals(4, childrenOperator.size());
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass());
+      assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+  }
+
+  /**
+   * This test will test dop = 2. Expected result is two pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan1, SeriesScan0, ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3].
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder2() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(2);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+    // The number of pipeline is 1, since parent pipeline hasn't joined
+    assertEquals(1, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(3, childrenOperator.size());
+    assertEquals(3, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 2; i++) {
+      assertEquals(SeriesScanOperator.class, childrenOperator.get(i).getClass());
+      assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass());
+    }
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+    // Validate the changes of node structure
+    assertEquals("root.sg.d1.s1", timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals("root.sg.d0.s1", timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    assertEquals(TimeJoinNode.class, timeJoinNode.getChildren().get(2).getClass());
+
+    // Validate the second pipeline
+    TimeJoinNode subTimeJoinNode = (TimeJoinNode) timeJoinNode.getChildren().get(2);
+    assertEquals(2, subTimeJoinNode.getChildren().size());
+    assertEquals(
+        "root.sg.d2.s1", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals(
+        "root.sg.d3.s1", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+  }
+
+  /**
+   * This test will test dop = 3. Expected result is three pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The third is: ExchangeOperator - TimeJoin1-1[SeriesScan2, SeriesScan3].
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder3() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(3);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+    // The number of pipeline is 2, since parent pipeline hasn't joined
+    assertEquals(2, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(3, childrenOperator.size());
+    assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+
+    // Validate the changes of node structure
+    assertEquals(3, timeJoinNode.getChildren().size());
+    assertEquals("root.sg.d0.s1", timeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals("root.sg.d1.s1", timeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    assertEquals(TimeJoinNode.class, timeJoinNode.getChildren().get(2).getClass());
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    TimeJoinNode subTimeJoinNode = (TimeJoinNode) timeJoinNode.getChildren().get(2);
+    assertEquals(2, subTimeJoinNode.getChildren().size());
+    assertEquals(
+        "root.sg.d2.s1", subTimeJoinNode.getChildren().get(0).getOutputColumnNames().get(0));
+    assertEquals(
+        "root.sg.d3.s1", subTimeJoinNode.getChildren().get(1).getOutputColumnNames().get(0));
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals(exchangeOperator2.getSourceId(), subTimeJoinNode.getPlanNodeId());
+  }
+
+  /**
+   * This test will test dop = 4. Expected result is four pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [SeriesScan0, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder4() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(4);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+    // The number of pipeline is 3, since parent pipeline hasn't joined
+    assertEquals(3, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    assertEquals(SeriesScanOperator.class, childrenOperator.get(0).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(1).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(2).getClass());
+    assertEquals(ExchangeOperator.class, childrenOperator.get(3).getClass());
+
+    // Validate the changes of node structure
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("SeriesScanNode2", exchangeOperator2.getSourceId().getId());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("SeriesScanNode3", exchangeOperator3.getSourceId().getId());
+  }
+
+  /**
+   * This test will test dop = 4. Expected result is five pipelines:
+   *
+   * <p>The first is: TimeJoin1 - [ExchangeOperator, ExchangeOperator, ExchangeOperator,
+   * ExchangeOperator];
+   *
+   * <p>The second is: ExchangeOperator - SeriesScan0.
+   *
+   * <p>The third is: ExchangeOperator - SeriesScan1.
+   *
+   * <p>The forth is: ExchangeOperator - SeriesScan2.
+   *
+   * <p>The fifth is: ExchangeOperator - SeriesScan3.
+   */
+  @Test
+  public void testConsumeAllChildrenPipelineBuilder5() throws IllegalPathException {
+    TypeProvider typeProvider = new TypeProvider();
+    TimeJoinNode timeJoinNode = initTimeJoinNode(typeProvider, 4);
+    LocalExecutionPlanContext context = createLocalExecutionPlanContext(typeProvider);
+    context.setDegreeOfParallelism(5);
+
+    List<Operator> childrenOperator =
+        operatorTreeGenerator.dealWithConsumeAllChildrenPipelineBreaker(timeJoinNode, context);
+    // The number of pipeline is 4, since parent pipeline hasn't joined
+    assertEquals(4, context.getPipelineNumber());
+
+    // Validate the first pipeline
+    assertEquals(4, childrenOperator.size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(ExchangeOperator.class, childrenOperator.get(i).getClass());
+    }
+
+    // Validate the changes of node structure
+    assertEquals(4, timeJoinNode.getChildren().size());
+    for (int i = 0; i < 4; i++) {
+      assertEquals(SeriesScanNode.class, timeJoinNode.getChildren().get(i).getClass());
+      assertEquals(
+          String.format("root.sg.d%d.s1", i),
+          timeJoinNode.getChildren().get(i).getOutputColumnNames().get(0));
+    }
+
+    // Validate the second pipeline
+    ExchangeOperator exchangeOperator1 = (ExchangeOperator) childrenOperator.get(0);
+    assertEquals("SeriesScanNode0", exchangeOperator1.getSourceId().getId());
+
+    // Validate the third pipeline
+    ExchangeOperator exchangeOperator2 = (ExchangeOperator) childrenOperator.get(1);
+    assertEquals("SeriesScanNode1", exchangeOperator2.getSourceId().getId());
+
+    // Validate the forth pipeline
+    ExchangeOperator exchangeOperator3 = (ExchangeOperator) childrenOperator.get(2);
+    assertEquals("SeriesScanNode2", exchangeOperator3.getSourceId().getId());
+
+    // Validate the fifth pipeline
+    ExchangeOperator exchangeOperator4 = (ExchangeOperator) childrenOperator.get(3);
+    assertEquals("SeriesScanNode3", exchangeOperator4.getSourceId().getId());
+  }
+
+  private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider typeProvider) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+    DataRegion dataRegion = Mockito.mock(DataRegion.class);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    fragmentInstanceContext.setDataRegion(dataRegion);
+
+    return new LocalExecutionPlanContext(typeProvider, fragmentInstanceContext);
+  }
+
+  /**
+   * This method will init a timeJoinNode with @childNum seriesScanNode as children.
+   *
+   * @param childNum the number of children
+   * @return a timeJoinNode with @childNum seriesScanNode as children
+   */
+  private TimeJoinNode initTimeJoinNode(TypeProvider typeProvider, int childNum)
+      throws IllegalPathException {
+    TimeJoinNode timeJoinNode = new TimeJoinNode(new PlanNodeId("TimeJoinNode"), Ordering.ASC);
+    for (int i = 0; i < childNum; i++) {
+      SeriesScanNode seriesScanNode =
+          new SeriesScanNode(
+              new PlanNodeId(String.format("SeriesScanNode%d", i)),
+              new MeasurementPath(String.format("root.sg.d%d.s1", i), TSDataType.INT32));
+      typeProvider.setType(seriesScanNode.getSeriesPath().toString(), TSDataType.INT32);
+      timeJoinNode.addChild(seriesScanNode);
+    }
+    return timeJoinNode;
+  }
+}