You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/06 00:37:59 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-4902]Optimize process logic for aggregation when there is only one data region
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new cbf72915ff [To rel/1.0] [IOTDB-4902]Optimize process logic for aggregation when there is only one data region
cbf72915ff is described below
commit cbf72915fffceafb6ab2f0261b66e561eadeb445
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Tue Dec 6 08:37:53 2022 +0800
[To rel/1.0] [IOTDB-4902]Optimize process logic for aggregation when there is only one data region
---
.../it/aggregation/IoTDBAggregationOptimizeIT.java | 181 +++++++++++++++
.../db/mpp/aggregation/ExtremeAccumulator.java | 20 +-
.../db/mpp/aggregation/FirstValueAccumulator.java | 28 ++-
.../db/mpp/aggregation/LastValueAccumulator.java | 27 ++-
.../db/mpp/aggregation/MaxValueAccumulator.java | 21 +-
.../db/mpp/aggregation/MinTimeAccumulator.java | 1 +
.../db/mpp/aggregation/MinValueAccumulator.java | 21 +-
.../iotdb/db/mpp/aggregation/SumAccumulator.java | 1 +
.../process/join/VerticallyConcatOperator.java | 207 +++++++++++++++++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 27 ++-
.../planner/distribution/ExchangeNodeAdder.java | 7 +
.../plan/planner/distribution/SourceRewriter.java | 248 ++++++++++++++-------
.../plan/planner/plan/node/PlanGraphPrinter.java | 8 +
.../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../plan/node/process/VerticallyConcatNode.java | 103 +++++++++
.../planner/plan/parameter/AggregationStep.java | 45 ++--
.../operator/VerticallyConcatOperatorTest.java | 197 ++++++++++++++++
.../distribution/AggregationDistributionTest.java | 70 +++++-
19 files changed, 1102 insertions(+), 121 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationOptimizeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationOptimizeIT.java
new file mode 100644
index 0000000000..05b4a84326
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationOptimizeIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.avg;
+import static org.apache.iotdb.itbase.constant.TestConstant.count;
+import static org.apache.iotdb.itbase.constant.TestConstant.extreme;
+import static org.apache.iotdb.itbase.constant.TestConstant.firstValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.lastValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.maxTime;
+import static org.apache.iotdb.itbase.constant.TestConstant.maxValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.minTime;
+import static org.apache.iotdb.itbase.constant.TestConstant.minValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.sum;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAggregationOptimizeIT {
+ private static final String[] SQLs =
+ new String[] {
+ "CREATE DATABASE root.test",
+ "CREATE TIMESERIES root.test.1region_d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.1region_d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.1region_d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.1region_d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.2region_d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.2region_d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.2region_d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.2region_d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "INSERT INTO root.test.1region_d1(timestamp,s1,s2) values(1, 1, 1)",
+ "INSERT INTO root.test.1region_d2(timestamp,s1,s2) values(1, 1, 1)",
+ "INSERT INTO root.test.1region_d1(timestamp,s1,s2) values(2, 2, 2)",
+ "INSERT INTO root.test.1region_d2(timestamp,s1,s2) values(2, 2, 2)",
+ "INSERT INTO root.test.2region_d1(timestamp,s1,s2) values(1, 1, 1)",
+ "INSERT INTO root.test.2region_d2(timestamp,s1,s2) values(1, 1, 1)",
+ "INSERT INTO root.test.2region_d1(timestamp,s1,s2) values(1000000000000, 1, 1)",
+ "INSERT INTO root.test.2region_d2(timestamp,s1,s2) values(1000000000000, 1, 1)",
+ "flush",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ prepareData(SQLs);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ @Test
+ public void testEachSeriesOneRegion() {
+ String[] expectedHeader =
+ new String[] {
+ count("root.test.1region_d1.s1"),
+ count("root.test.1region_d2.s1"),
+ count("root.test.1region_d1.s2"),
+ count("root.test.1region_d2.s2")
+ };
+ String[] retArray = new String[] {"2,2,2,2,"};
+ resultSetEqualWithDescOrderTest(
+ "select count(s1), count(s2) from root.test.1region_d1, root.test.1region_d2",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testSomeSeriesOneRegion() {
+ String[] expectedHeader =
+ new String[] {count("root.test.1region_d1.s1"), count("root.test.2region_d2.s1")};
+ String[] retArray = new String[] {"2,2,"};
+ resultSetEqualWithDescOrderTest(
+ "select count(s1) from root.test.1region_d1, root.test.2region_d2",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testAggregationResultConcatOneRegion() {
+ String[] expectedHeader =
+ new String[] {count("root.test.2region_d1.s1"), count("root.test.2region_d1.s2")};
+ String[] retArray = new String[] {"2,2,"};
+ resultSetEqualWithDescOrderTest(
+ "select count(s1),count(s2) from root.test.2region_d1", expectedHeader, retArray);
+ }
+
+ @Test
+ public void mixTest() {
+ String[] expectedHeader =
+ new String[] {
+ count("root.test.2region_d1.s1"),
+ count("root.test.2region_d2.s1"),
+ count("root.test.1region_d2.s1"),
+ count("root.test.1region_d1.s1"),
+ count("root.test.2region_d1.s2"),
+ count("root.test.2region_d2.s2"),
+ count("root.test.1region_d2.s2"),
+ count("root.test.1region_d1.s2")
+ };
+ String[] retArray = new String[] {"2,2,2,2,2,2,2,2,"};
+ resultSetEqualWithDescOrderTest(
+ "select count(s1),count(s2) from root.test.**", expectedHeader, retArray);
+
+ expectedHeader =
+ new String[] {
+ TIMESTAMP_STR,
+ sum("root.test.2region_d1.s1"),
+ sum("root.test.2region_d2.s1"),
+ sum("root.test.1region_d2.s1"),
+ sum("root.test.1region_d1.s1"),
+ sum("root.test.2region_d1.s2"),
+ sum("root.test.2region_d2.s2"),
+ sum("root.test.1region_d2.s2"),
+ sum("root.test.1region_d1.s2")
+ };
+ retArray =
+ new String[] {
+ "1,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,", "2,null,null,2.0,2.0,null,null,2.0,2.0,"
+ };
+ resultSetEqualWithDescOrderTest(
+ "select sum(s1),sum(s2) from root.test.** group by ([1,3), 1ms)", expectedHeader, retArray);
+ }
+
+ @Test
+ public void testStaticAggregator() {
+ String[] expectedHeader =
+ new String[] {
+ count("root.test.1region_d1.s1"),
+ sum("root.test.1region_d1.s1"),
+ avg("root.test.1region_d1.s1"),
+ extreme("root.test.1region_d1.s1"),
+ maxValue("root.test.1region_d1.s1"),
+ minValue("root.test.1region_d1.s1"),
+ firstValue("root.test.1region_d1.s1"),
+ lastValue("root.test.1region_d1.s1"),
+ maxTime("root.test.1region_d1.s1"),
+ minTime("root.test.1region_d1.s1"),
+ count("root.test.2region_d1.s1")
+ };
+ String[] retArray = new String[] {"2,3.0,1.5,2,2,1,1,2,2,1,2,"};
+ resultSetEqualWithDescOrderTest(
+ "select count(1region_d1.s1),sum(1region_d1.s1),avg(1region_d1.s1),"
+ + "extreme(1region_d1.s1),max_value(1region_d1.s1),min_value(1region_d1.s1),"
+ + "first_value(1region_d1.s1),last_value(1region_d1.s1),max_time(1region_d1.s1),"
+ + "min_time(1region_d1.s1),count(2region_d1.s1) from root.test",
+ expectedHeader,
+ retArray);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index c207ef1a52..eedbe08063 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -123,7 +123,25 @@ public class ExtremeAccumulator implements Accumulator {
return;
}
initResult = true;
- extremeResult.setObject(finalResult.getObject(0));
+ switch (seriesDataType) {
+ case INT32:
+ extremeResult.setInt(finalResult.getInt(0));
+ break;
+ case INT64:
+ extremeResult.setLong(finalResult.getLong(0));
+ break;
+ case FLOAT:
+ extremeResult.setFloat(finalResult.getFloat(0));
+ break;
+ case DOUBLE:
+ extremeResult.setDouble(finalResult.getDouble(0));
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
}
// columnBuilder should be single in ExtremeAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index c4a6ff3055..90ba07f543 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -130,8 +130,32 @@ public class FirstValueAccumulator implements Accumulator {
@Override
public void setFinal(Column finalResult) {
reset();
- hasCandidateResult = true;
- firstValue.setObject(finalResult.getObject(0));
+ if (!finalResult.isNull(0)) {
+ hasCandidateResult = true;
+ switch (seriesDataType) {
+ case INT32:
+ firstValue.setInt(finalResult.getInt(0));
+ break;
+ case INT64:
+ firstValue.setLong(finalResult.getLong(0));
+ break;
+ case FLOAT:
+ firstValue.setFloat(finalResult.getFloat(0));
+ break;
+ case DOUBLE:
+ firstValue.setDouble(finalResult.getDouble(0));
+ break;
+ case TEXT:
+ firstValue.setBinary(finalResult.getBinary(0));
+ break;
+ case BOOLEAN:
+ firstValue.setBoolean(finalResult.getBoolean(0));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+ }
+ }
}
// columnBuilder should be double in FirstValueAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index 61079a6394..e198948f49 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -130,7 +130,32 @@ public class LastValueAccumulator implements Accumulator {
@Override
public void setFinal(Column finalResult) {
reset();
- lastValue.setObject(finalResult.getObject(0));
+ if (!finalResult.isNull(0)) {
+ initResult = true;
+ switch (seriesDataType) {
+ case INT32:
+ lastValue.setInt(finalResult.getInt(0));
+ break;
+ case INT64:
+ lastValue.setLong(finalResult.getLong(0));
+ break;
+ case FLOAT:
+ lastValue.setFloat(finalResult.getFloat(0));
+ break;
+ case DOUBLE:
+ lastValue.setDouble(finalResult.getDouble(0));
+ break;
+ case TEXT:
+ lastValue.setBinary(finalResult.getBinary(0));
+ break;
+ case BOOLEAN:
+ lastValue.setBoolean(finalResult.getBoolean(0));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in LastValue: %s", seriesDataType));
+ }
+ }
}
// columnBuilder should be double in LastValueAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index 9380e70a59..768e9cf16f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -120,7 +120,26 @@ public class MaxValueAccumulator implements Accumulator {
if (finalResult.isNull(0)) {
return;
}
- maxResult.setObject(finalResult.getObject(0));
+ initResult = true;
+ switch (seriesDataType) {
+ case INT32:
+ maxResult.setInt(finalResult.getInt(0));
+ break;
+ case INT64:
+ maxResult.setLong(finalResult.getLong(0));
+ break;
+ case FLOAT:
+ maxResult.setFloat(finalResult.getFloat(0));
+ break;
+ case DOUBLE:
+ maxResult.setDouble(finalResult.getDouble(0));
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+ }
}
// columnBuilder should be single in countAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index 5d194a94c7..252ccaf02b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -82,6 +82,7 @@ public class MinTimeAccumulator implements Accumulator {
if (finalResult.isNull(0)) {
return;
}
+ hasCandidateResult = true;
minTime = finalResult.getLong(0);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 9f415930b4..2abd59e6e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -120,7 +120,26 @@ public class MinValueAccumulator implements Accumulator {
if (finalResult.isNull(0)) {
return;
}
- minResult.setObject(finalResult.getObject(0));
+ initResult = true;
+ switch (seriesDataType) {
+ case INT32:
+ minResult.setInt(finalResult.getInt(0));
+ break;
+ case INT64:
+ minResult.setLong(finalResult.getLong(0));
+ break;
+ case FLOAT:
+ minResult.setFloat(finalResult.getFloat(0));
+ break;
+ case DOUBLE:
+ minResult.setDouble(finalResult.getDouble(0));
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MinValue: %s", seriesDataType));
+ }
}
// columnBuilder should be single in MinValueAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index 66d3064c44..bd113d7801 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -90,6 +90,7 @@ public class SumAccumulator implements Accumulator {
if (finalResult.isNull(0)) {
return;
}
+ initResult = true;
sumValue = finalResult.getDouble(0);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
new file mode 100644
index 0000000000..2a4ffd58c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class VerticallyConcatOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+
+ private final List<Operator> children;
+
+ private final int inputOperatorsCount;
+
+ /** TsBlock from child operator. Only one cache now. */
+ private final TsBlock[] inputTsBlocks;
+
+ /** start index for each input TsBlocks and size of it is equal to inputTsBlocks */
+ private final int[] inputIndex;
+
+ private final int outputColumnCount;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private boolean finished;
+
+ public VerticallyConcatOperator(
+ OperatorContext operatorContext, List<Operator> children, List<TSDataType> dataTypes) {
+ checkArgument(
+ children != null && children.size() > 0,
+ "child size of VerticallyConcatOperator should be larger than 0");
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.inputOperatorsCount = children.size();
+ this.inputTsBlocks = new TsBlock[this.inputOperatorsCount];
+ this.inputIndex = new int[this.inputOperatorsCount];
+ this.outputColumnCount = dataTypes.size();
+ this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (empty(i)) {
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ listenableFutures.add(blocked);
+ }
+ }
+ }
+ return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+ }
+
+ @Override
+ public TsBlock next() {
+ tsBlockBuilder.reset();
+ // indicates how many rows can be built in this calculate
+ int maxRowCanBuild = Integer.MAX_VALUE;
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (empty(i)) {
+ inputIndex[i] = 0;
+ inputTsBlocks[i] = children.get(i).next();
+ if (empty(i)) {
+ // child operator has not prepared TsBlock well
+ return null;
+ }
+ }
+ maxRowCanBuild =
+ Math.min(maxRowCanBuild, inputTsBlocks[i].getPositionCount() - inputIndex[i]);
+ }
+
+ TimeColumn firstTimeColumn = inputTsBlocks[0].getTimeColumn();
+ TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+ // build TimeColumn according to the first inputTsBlock
+ int currTsBlockIndex = inputIndex[0];
+ for (int row = 0; row < maxRowCanBuild; row++) {
+ timeColumnBuilder.writeLong(firstTimeColumn.getLong(currTsBlockIndex + row));
+ tsBlockBuilder.declarePosition();
+ }
+
+ // build ValueColumns according to inputTsBlocks
+ int valueBuilderIndex = 0; // indicate which valueColumnBuilder should use
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ currTsBlockIndex = inputIndex[i];
+ for (Column column : inputTsBlocks[i].getValueColumns()) {
+ for (int row = 0; row < maxRowCanBuild; row++) {
+ if (column.isNull(currTsBlockIndex + row)) {
+ valueColumnBuilders[valueBuilderIndex].appendNull();
+ } else {
+ valueColumnBuilders[valueBuilderIndex].write(column, currTsBlockIndex + row);
+ }
+ }
+ valueBuilderIndex++;
+ }
+ inputIndex[i] += maxRowCanBuild;
+ }
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (finished) {
+ return false;
+ }
+ return !empty(0) || children.get(0).hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ if (finished) {
+ return true;
+ }
+ return finished = empty(0) && !children.get(0).hasNext();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+ }
+
+ maxPeekMemory += calculateMaxReturnSize();
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + outputColumnCount)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
+
+ /**
+ * If the tsBlock of tsBlockIndex is null or has no more data in the tsBlock, return true; else
+ * return false;
+ */
+ private boolean empty(int tsBlockIndex) {
+ return inputTsBlocks[tsBlockIndex] == null
+ || inputTsBlocks[tsBlockIndex].getPositionCount() == inputIndex[tsBlockIndex];
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e0438e2668..1a22949343 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPrevi
import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.VerticallyConcatOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
@@ -145,11 +146,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -1533,7 +1536,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
TimeJoinOperator.class.getSimpleName());
TimeComparator timeComparator =
node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
- List<OutputColumn> outputColumns = generateOutputColumns(node);
+ List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node);
List<ColumnMerger> mergers = createColumnMergers(outputColumns, timeComparator);
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
@@ -1547,7 +1550,27 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
timeComparator);
}
- private List<OutputColumn> generateOutputColumns(TimeJoinNode node) {
+ @Override
+ public Operator visitVerticallyConcat(
+ VerticallyConcatNode node, LocalExecutionPlanContext context) {
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(Collectors.toList());
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ VerticallyConcatOperator.class.getSimpleName());
+ List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new VerticallyConcatOperator(operatorContext, children, outputColumnTypes);
+ }
+
+ private List<OutputColumn> generateOutputColumnsFromChildren(MultiChildProcessNode node) {
// TODO we should also sort the InputLocation for each column if they are not overlapped
return makeLayout(node).values().stream()
.map(inputLocations -> new OutputColumn(inputLocations, inputLocations.size() > 1))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 7a1a186861..a9590d995a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessN
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -252,10 +253,16 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processOneChildNode(node, context);
}
+ @Override
public PlanNode visitGroupByTag(GroupByTagNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitVerticallyConcat(VerticallyConcatNode node, NodeGroupContext context) {
+ return processMultiChildNode(node, context);
+ }
+
private PlanNode processDeviceViewWithAggregation(DeviceViewNode node, NodeGroupContext context) {
// group all the children by DataRegion distribution
Map<TRegionReplicaSet, DeviceViewGroup> deviceViewGroupMap = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 8224d931a4..85e331ad22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -597,61 +598,94 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, DistributionPlanContext context) {
-
- List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
- Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
- // construct AggregationDescriptor for AggregationNode
- List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
- for (PlanNode child : root.getChildren()) {
- SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
- handle
- .getAggregationDescriptorList()
- .forEach(
- descriptor -> {
- rootAggDescriptorList.add(
- new AggregationDescriptor(
- descriptor.getAggregationFuncName(),
- context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE,
- descriptor.getInputExpressions()));
- });
+ Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup;
+
+ // construct newRoot
+ MultiChildProcessNode newRoot;
+ if (context.isRoot) {
+ // This node is the root of PlanTree,
+ // if the aggregated series have only one region,
+ // upstream will give the final aggregate result,
+ // the step of this series' aggregator will be `STATIC`
+ List<SeriesAggregationSourceNode> sources = new ArrayList<>();
+ Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
+ boolean[] eachSeriesOneRegion = {true};
+ sourceGroup =
+ splitAggregationSourceByPartition(
+ root, context, sources, eachSeriesOneRegion, regionCountPerSeries);
+
+ if (eachSeriesOneRegion[0]) {
+ newRoot = new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+ } else {
+ List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+ for (PlanNode child : root.getChildren()) {
+ SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+ handle
+ .getAggregationDescriptorList()
+ .forEach(
+ descriptor ->
+ rootAggDescriptorList.add(
+ new AggregationDescriptor(
+ descriptor.getAggregationFuncName(),
+ regionCountPerSeries.get(handle.getPartitionPath()) == 1
+ ? AggregationStep.STATIC
+ : AggregationStep.FINAL,
+ descriptor.getInputExpressions())));
+ }
+ SeriesAggregationSourceNode seed = (SeriesAggregationSourceNode) root.getChildren().get(0);
+ newRoot =
+ new AggregationNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ rootAggDescriptorList,
+ seed.getGroupByTimeParameter(),
+ seed.getScanOrder());
+ }
+ } else {
+ // If this node is not the root node of PlanTree,
+ // it declares that there have something to do at downstream,
+ // the step of this AggregationNode should be `INTERMEDIATE`
+ sourceGroup = splitAggregationSourceByPartition(root, context);
+ List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+ for (PlanNode child : root.getChildren()) {
+ SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+ handle
+ .getAggregationDescriptorList()
+ .forEach(
+ descriptor ->
+ rootAggDescriptorList.add(
+ new AggregationDescriptor(
+ descriptor.getAggregationFuncName(),
+ AggregationStep.INTERMEDIATE,
+ descriptor.getInputExpressions())));
+ }
+ SeriesAggregationSourceNode seed = (SeriesAggregationSourceNode) root.getChildren().get(0);
+ newRoot =
+ new AggregationNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ rootAggDescriptorList,
+ seed.getGroupByTimeParameter(),
+ seed.getScanOrder());
}
- rootAggDescriptorList.forEach(
- d ->
- LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
- d, context.queryContext.getTypeProvider()));
- checkArgument(
- sources.size() > 0, "Aggregation sources should not be empty when distribution planning");
- SeriesAggregationSourceNode seed = sources.get(0);
- AggregationNode aggregationNode =
- new AggregationNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- rootAggDescriptorList,
- seed.getGroupByTimeParameter(),
- seed.getScanOrder());
- final boolean[] addParent = {false};
+ boolean[] addParent = {false};
sourceGroup.forEach(
(dataRegion, sourceNodes) -> {
if (sourceNodes.size() == 1) {
- aggregationNode.addChild(sourceNodes.get(0));
+ newRoot.addChild(sourceNodes.get(0));
} else {
if (!addParent[0]) {
- sourceNodes.forEach(aggregationNode::addChild);
+ sourceNodes.forEach(newRoot::addChild);
addParent[0] = true;
} else {
- // We clone a TimeJoinNode from root to make the params to be consistent.
- // But we need to assign a new ID to it
- TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
- parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ VerticallyConcatNode parentOfGroup =
+ new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
sourceNodes.forEach(parentOfGroup::addChild);
- aggregationNode.addChild(parentOfGroup);
+ newRoot.addChild(parentOfGroup);
}
}
});
- return aggregationNode;
+ return newRoot;
}
@Override
@@ -660,9 +694,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return defaultRewrite(root, context);
}
// Firstly, we build the tree structure for GroupByLevelNode
- List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+ splitAggregationSourceByPartition(root, context);
boolean containsSlidingWindow =
root.getChildren().size() == 1
@@ -684,9 +717,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
@Override
public PlanNode visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
- List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+ splitAggregationSourceByPartition(root, context);
boolean containsSlidingWindow =
root.getChildren().size() == 1
@@ -730,11 +762,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
if (sourceNodes.size() == 1) {
parentOfGroup.addChild(sourceNodes.get(0));
} else {
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(
- context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
- sourceNodes.forEach(timeJoinNode::addChild);
- parentOfGroup.addChild(timeJoinNode);
+ VerticallyConcatNode verticallyConcatNode =
+ new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+ sourceNodes.forEach(verticallyConcatNode::addChild);
+ parentOfGroup.addChild(verticallyConcatNode);
}
groups.add(parentOfGroup);
});
@@ -871,11 +902,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
if (sourceNodes.size() == 1) {
parentOfGroup.addChild(sourceNodes.get(0));
} else {
- PlanNode timeJoinNode =
- new TimeJoinNode(
- context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
- sourceNodes.forEach(timeJoinNode::addChild);
- parentOfGroup.addChild(timeJoinNode);
+ VerticallyConcatNode verticallyConcatNode =
+ new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+ sourceNodes.forEach(verticallyConcatNode::addChild);
+ parentOfGroup.addChild(verticallyConcatNode);
}
newRoot.addChild(parentOfGroup);
});
@@ -897,11 +927,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
sourceNodes.forEach(newRoot::addChild);
addParent[0] = true;
} else {
- PlanNode timeJoinNode =
- new TimeJoinNode(
- context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
- sourceNodes.forEach(timeJoinNode::addChild);
- newRoot.addChild(timeJoinNode);
+ VerticallyConcatNode verticallyConcatNode =
+ new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+ sourceNodes.forEach(verticallyConcatNode::addChild);
+ newRoot.addChild(verticallyConcatNode);
}
}
});
@@ -916,45 +945,96 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return columnName.contains(expression.getExpressionString());
}
- private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
- PlanNode root, DistributionPlanContext context) {
+ /**
+ * This method is used for rewriting second step AggregationNode like GroupByLevelNode,
+ * GroupByTagNode and SlidingWindowAggregationNode, so the first step AggregationNode's step will
+ * be {@link AggregationStep#PARTIAL}
+ */
+ private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
+ splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext context) {
// Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
- // Step 1: split SeriesAggregationSourceNode according to data partition
+
+ // Step 1: construct SeriesAggregationSourceNode for each data region of one Path
List<SeriesAggregationSourceNode> sources = new ArrayList<>();
- Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
for (SeriesAggregationSourceNode child : rawSources) {
- List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(child.getPartitionPath(), child.getPartitionTimeFilter());
- for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) child.clone();
- split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- split.setRegionReplicaSet(dataRegion);
- // Let each split reference different object of AggregationDescriptorList
- split.setAggregationDescriptorList(
- child.getAggregationDescriptorList().stream()
- .map(AggregationDescriptor::deepClone)
- .collect(Collectors.toList()));
- sources.add(split);
- }
- regionCountPerSeries.put(child.getPartitionPath(), dataDistribution.size());
+ constructAggregationSourceNodeForPerRegion(context, sources, child);
}
- // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
+ // Step 2: change step to PARTIAL for each SeriesAggregationSourceNode and update TypeProvider
for (SeriesAggregationSourceNode source : sources) {
- // boolean isFinal = regionCountPerSeries.get(source.getPartitionPath()) == 1;
- // TODO: (xingtanzjr) need to optimize this step later. We make it as Partial now.
- boolean isFinal = false;
source
.getAggregationDescriptorList()
.forEach(
d -> {
- d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL);
+ d.setStep(AggregationStep.PARTIAL);
LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
d, context.queryContext.getTypeProvider());
});
}
- return sources;
+
+ // Step 3: group all SeriesAggregationSourceNode by each region
+ return sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+ }
+
+ private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
+ splitAggregationSourceByPartition(
+ PlanNode root,
+ DistributionPlanContext context,
+ List<SeriesAggregationSourceNode> sources,
+ boolean[] eachSeriesOneRegion,
+ Map<PartialPath, Integer> regionCountPerSeries) {
+ // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
+ List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
+
+ // Step 1: construct SeriesAggregationSourceNode for each data region of one Path
+ for (SeriesAggregationSourceNode child : rawSources) {
+ regionCountPerSeries.put(
+ child.getPartitionPath(),
+ constructAggregationSourceNodeForPerRegion(context, sources, child));
+ }
+
+ // Step 2: change step to SINGLE or PARTIAL for each SeriesAggregationSourceNode and update
+ // TypeProvider
+ for (SeriesAggregationSourceNode source : sources) {
+ boolean isSingle = regionCountPerSeries.get(source.getPartitionPath()) == 1;
+ source
+ .getAggregationDescriptorList()
+ .forEach(
+ d -> {
+ if (isSingle) {
+ d.setStep(AggregationStep.SINGLE);
+ } else {
+ eachSeriesOneRegion[0] = false;
+ d.setStep(AggregationStep.PARTIAL);
+ LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+ d, context.queryContext.getTypeProvider());
+ }
+ });
+ }
+
+ // Step 3: group all SeriesAggregationSourceNode by each region
+ return sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+ }
+
+ private int constructAggregationSourceNodeForPerRegion(
+ DistributionPlanContext context,
+ List<SeriesAggregationSourceNode> sources,
+ SeriesAggregationSourceNode child) {
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(child.getPartitionPath(), child.getPartitionTimeFilter());
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) child.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ // Let each split reference different object of AggregationDescriptorList
+ split.setAggregationDescriptorList(
+ child.getAggregationDescriptorList().stream()
+ .map(AggregationDescriptor::deepClone)
+ .collect(Collectors.toList()));
+ sources.add(split);
+ }
+ return dataDistribution.size();
}
private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index c929512dae..1ea9a94440 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -394,6 +395,13 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitVerticallyConcat(VerticallyConcatNode node, GraphContext context) {
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("VerticallyConcat-%s", node.getPlanNodeId().getId()));
+ return render(node, boxValue, context);
+ }
+
private String printRegion(TRegionReplicaSet regionReplicaSet) {
return String.format(
"Partition: %s",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index bad8725978..0f846ad25b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -150,7 +151,8 @@ public enum PlanNodeType {
ROLLBACK_PRE_DEACTIVATE_TEMPLATE_NODE((short) 60),
DEACTIVATE_TEMPLATE_NODE((short) 61),
INTO((short) 62),
- DEVICE_VIEW_INTO((short) 63);
+ DEVICE_VIEW_INTO((short) 63),
+ VERTICALLY_CONCAT((short) 64);
public static final int BYTES = Short.BYTES;
@@ -327,6 +329,8 @@ public enum PlanNodeType {
return IntoNode.deserialize(buffer);
case 63:
return DeviceViewIntoNode.deserialize(buffer);
+ case 64:
+ return VerticallyConcatNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 788649ffc8..8bf41c86ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -326,4 +327,8 @@ public abstract class PlanVisitor<R, C> {
public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
new file mode 100644
index 0000000000..24760a4228
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * This node is responsible for merge two or more TsBlocks have been aligned. TsBlocks must have
+ * same time column or no time column at all, and we can merge the value column directly without
+ * compare.
+ */
+public class VerticallyConcatNode extends MultiChildProcessNode {
+
+ public VerticallyConcatNode(PlanNodeId id) {
+ super(id, new ArrayList<>());
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new VerticallyConcatNode(getPlanNodeId());
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return children.stream()
+ .map(PlanNode::getOutputColumnNames)
+ .flatMap(List::stream)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitVerticallyConcat(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.VERTICALLY_CONCAT.serialize(byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.VERTICALLY_CONCAT.serialize(stream);
+ }
+
+ public static VerticallyConcatNode deserialize(ByteBuffer byteBuffer) {
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new VerticallyConcatNode(planNodeId);
+ }
+
+ @Override
+ public String toString() {
+ return "VerticallyConcatNode-" + this.getPlanNodeId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ VerticallyConcatNode that = (VerticallyConcatNode) o;
+ return children.equals(that.children);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), children);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java
index 9e7c3e701a..04cab22a27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java
@@ -39,15 +39,15 @@ import java.nio.ByteBuffer;
public enum AggregationStep {
// input Raw, output Partial
- PARTIAL(InputType.RAW, true),
+ PARTIAL(InputType.RAW, true, (byte) 0),
// input Partial, output Final
- FINAL(InputType.PARTIAL, false),
+ FINAL(InputType.PARTIAL, false, (byte) 1),
// input Partial, output Partial
- INTERMEDIATE(InputType.PARTIAL, true),
+ INTERMEDIATE(InputType.PARTIAL, true, (byte) 2),
// input Raw, output Final
- SINGLE(InputType.RAW, false),
+ SINGLE(InputType.RAW, false, (byte) 3),
// input final, output final
- STATIC(InputType.FINAL, false);
+ STATIC(InputType.FINAL, false, (byte) 4);
private enum InputType {
RAW,
@@ -57,10 +57,12 @@ public enum AggregationStep {
private final InputType inputType;
private final boolean outputPartial;
+ private final byte ordinal;
- AggregationStep(InputType inputType, boolean outputPartial) {
+ AggregationStep(InputType inputType, boolean outputPartial, byte ordinal) {
this.inputType = inputType;
this.outputPartial = outputPartial;
+ this.ordinal = ordinal;
}
public boolean isInputRaw() {
@@ -80,27 +82,28 @@ public enum AggregationStep {
}
public void serialize(ByteBuffer byteBuffer) {
- ReadWriteIOUtils.write(isInputRaw(), byteBuffer);
- ReadWriteIOUtils.write(isOutputPartial(), byteBuffer);
+ ReadWriteIOUtils.write(ordinal, byteBuffer);
}
public void serialize(DataOutputStream stream) throws IOException {
- ReadWriteIOUtils.write(isInputRaw(), stream);
- ReadWriteIOUtils.write(isOutputPartial(), stream);
+ ReadWriteIOUtils.write(ordinal, stream);
}
public static AggregationStep deserialize(ByteBuffer byteBuffer) {
- boolean isInputRaw = ReadWriteIOUtils.readBool(byteBuffer);
- boolean isOutputPartial = ReadWriteIOUtils.readBool(byteBuffer);
- if (isInputRaw && isOutputPartial) {
- return AggregationStep.PARTIAL;
+ byte type = ReadWriteIOUtils.readByte(byteBuffer);
+ switch (type) {
+ case 0:
+ return AggregationStep.PARTIAL;
+ case 1:
+ return AggregationStep.FINAL;
+ case 2:
+ return AggregationStep.INTERMEDIATE;
+ case 3:
+ return AggregationStep.SINGLE;
+ case 4:
+ return AggregationStep.STATIC;
+ default:
+ throw new IllegalArgumentException("Invalid AggregationStep type: " + type);
}
- if (!isInputRaw && isOutputPartial) {
- return AggregationStep.INTERMEDIATE;
- }
- if (isInputRaw) {
- return AggregationStep.SINGLE;
- }
- return AggregationStep.FINAL;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/VerticallyConcatOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/VerticallyConcatOperatorTest.java
new file mode 100644
index 0000000000..988b95a172
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/VerticallyConcatOperatorTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.VerticallyConcatOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class VerticallyConcatOperatorTest {
+ private static final String VERTICALLY_CONCAT_OPERATOR_TEST_SG =
+ "root.VerticallyConcatOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas,
+ deviceIds,
+ seqResources,
+ unSeqResources,
+ VERTICALLY_CONCAT_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void batchTest1() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesAggregationScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 3, new PlanNodeId("3"), VerticallyConcatOperator.class.getSimpleName());
+
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(
+ VERTICALLY_CONCAT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ List<AggregationType> aggregationTypes =
+ Arrays.asList(AggregationType.COUNT, AggregationType.SUM, AggregationType.FIRST_VALUE);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 10, 1, 1, true);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ new SeriesAggregationScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
+ null,
+ true,
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ seriesAggregationScanOperator1.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+ seriesAggregationScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(
+ VERTICALLY_CONCAT_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ new SeriesAggregationScanOperator(
+ planNodeId2,
+ measurementPath2,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
+ null,
+ true,
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ seriesAggregationScanOperator2.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+ seriesAggregationScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ VerticallyConcatOperator verticallyConcatOperator =
+ new VerticallyConcatOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesAggregationScanOperator1, seriesAggregationScanOperator2),
+ Arrays.asList(
+ TSDataType.INT64,
+ TSDataType.DOUBLE,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.DOUBLE,
+ TSDataType.INT32));
+
+ int count = 0;
+ while (verticallyConcatOperator.hasNext()) {
+ TsBlock tsBlock = verticallyConcatOperator.next();
+ assertEquals(6, tsBlock.getValueColumnCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+ assertEquals(count, tsBlock.getTimeByIndex(i));
+ assertEquals(1, tsBlock.getColumn(0).getLong(i));
+ assertEquals(20000 + count, tsBlock.getColumn(1).getDouble(i), 0.00001);
+ assertEquals(20000 + count, tsBlock.getColumn(2).getInt(i));
+ assertEquals(1, tsBlock.getColumn(3).getLong(i));
+ assertEquals(20000 + count, tsBlock.getColumn(4).getDouble(i), 0.00001);
+ assertEquals(20000 + count, tsBlock.getColumn(5).getInt(i));
+ }
+ }
+ assertEquals(10, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 8074203fc2..991cebb099 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -144,14 +145,19 @@ public class AggregationDistributionTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
DistributedQueryPlan plan = planner.planFragments();
assertEquals(3, plan.getInstances().size());
+
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ List<AggregationStep> expected = Arrays.asList(AggregationStep.STATIC, AggregationStep.FINAL);
+ verifyAggregationStep(expected, fragmentInstances.get(0).getFragment().getPlanNodeTree());
+
Map<String, AggregationStep> expectedStep = new HashMap<>();
expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
- expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
+ expectedStep.put(d2s1Path, AggregationStep.SINGLE);
fragmentInstances.forEach(
f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
}
+ // verify SeriesAggregationSourceNode
private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
if (root == null) {
return;
@@ -167,6 +173,22 @@ public class AggregationDistributionTest {
root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
}
+ // verify AggregationNode
+ private void verifyAggregationStep(List<AggregationStep> expected, PlanNode root) {
+ if (root == null) {
+ return;
+ }
+ if (root instanceof AggregationNode) {
+ List<AggregationStep> actual =
+ ((AggregationNode) root)
+ .getAggregationDescriptorList().stream()
+ .map(AggregationDescriptor::getStep)
+ .collect(Collectors.toList());
+ assertEquals(expected, actual);
+ }
+ root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
+ }
+
@Test
public void testTimeJoinAggregationWithSlidingWindow() throws IllegalPathException {
QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
@@ -391,10 +413,15 @@ public class AggregationDistributionTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
DistributedQueryPlan plan = planner.planFragments();
assertEquals(2, plan.getInstances().size());
+
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ List<AggregationStep> expected = Arrays.asList(AggregationStep.FINAL, AggregationStep.FINAL);
+ verifyAggregationStep(expected, fragmentInstances.get(0).getFragment().getPlanNodeTree());
+
Map<String, AggregationStep> expectedStep = new HashMap<>();
expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
- List<FragmentInstance> fragmentInstances = plan.getInstances();
+
fragmentInstances.forEach(
f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
@@ -707,7 +734,7 @@ public class AggregationDistributionTest {
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
assertTrue(f1Root instanceof DeviceViewNode);
- assertTrue(f2Root instanceof TimeJoinNode);
+ assertTrue(f2Root instanceof VerticallyConcatNode);
assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
}
@@ -731,9 +758,9 @@ public class AggregationDistributionTest {
PlanNode f3Root =
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
assertTrue(f1Root instanceof DeviceMergeNode);
- assertTrue(f2Root instanceof TimeJoinNode);
+ assertTrue(f2Root instanceof VerticallyConcatNode);
assertTrue(f3Root instanceof DeviceViewNode);
- assertTrue(f3Root.getChildren().get(0) instanceof AggregationNode);
+ assertTrue(f3Root.getChildren().get(0) instanceof VerticallyConcatNode);
assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof AggregationNode);
assertEquals(3, f1Root.getChildren().get(0).getChildren().get(0).getChildren().size());
@@ -756,7 +783,7 @@ public class AggregationDistributionTest {
PlanNode f2Root =
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
assertTrue(f1Root instanceof DeviceViewNode);
- assertTrue(f2Root instanceof TimeJoinNode);
+ assertTrue(f2Root instanceof VerticallyConcatNode);
assertEquals(2, f1Root.getChildren().size());
}
@@ -832,4 +859,33 @@ public class AggregationDistributionTest {
DistributedQueryPlan plan = planner.planFragments();
assertEquals(3, plan.getInstances().size());
}
+
+ @Test
+ public void testEachSeriesOneRegion() {
+ QueryId queryId = new QueryId("test_each_series_1_region");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql = "select count(s1), count(s2) from root.sg.d22, root.sg.d55555";
+ Analysis analysis = Util.analyze(sql, context);
+ PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(
+ fragmentInstance ->
+ assertTrue(
+ fragmentInstance.getFragment().getPlanNodeTree().getChildren().get(0)
+ instanceof VerticallyConcatNode));
+
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put("root.sg.d22.s1", AggregationStep.SINGLE);
+ expectedStep.put("root.sg.d22.s2", AggregationStep.SINGLE);
+ expectedStep.put("root.sg.d55555.s1", AggregationStep.SINGLE);
+ expectedStep.put("root.sg.d55555.s2", AggregationStep.SINGLE);
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+ }
}