You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/06 00:37:59 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-4902]Optimize process logic for aggregation when there is only one data region

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new cbf72915ff [To rel/1.0] [IOTDB-4902]Optimize process logic for aggregation when there is only one data region
cbf72915ff is described below

commit cbf72915fffceafb6ab2f0261b66e561eadeb445
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Tue Dec 6 08:37:53 2022 +0800

    [To rel/1.0] [IOTDB-4902]Optimize process logic for aggregation when there is only one data region
---
 .../it/aggregation/IoTDBAggregationOptimizeIT.java | 181 +++++++++++++++
 .../db/mpp/aggregation/ExtremeAccumulator.java     |  20 +-
 .../db/mpp/aggregation/FirstValueAccumulator.java  |  28 ++-
 .../db/mpp/aggregation/LastValueAccumulator.java   |  27 ++-
 .../db/mpp/aggregation/MaxValueAccumulator.java    |  21 +-
 .../db/mpp/aggregation/MinTimeAccumulator.java     |   1 +
 .../db/mpp/aggregation/MinValueAccumulator.java    |  21 +-
 .../iotdb/db/mpp/aggregation/SumAccumulator.java   |   1 +
 .../process/join/VerticallyConcatOperator.java     | 207 +++++++++++++++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  27 ++-
 .../planner/distribution/ExchangeNodeAdder.java    |   7 +
 .../plan/planner/distribution/SourceRewriter.java  | 248 ++++++++++++++-------
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   8 +
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../plan/node/process/VerticallyConcatNode.java    | 103 +++++++++
 .../planner/plan/parameter/AggregationStep.java    |  45 ++--
 .../operator/VerticallyConcatOperatorTest.java     | 197 ++++++++++++++++
 .../distribution/AggregationDistributionTest.java  |  70 +++++-
 19 files changed, 1102 insertions(+), 121 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationOptimizeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationOptimizeIT.java
new file mode 100644
index 0000000000..05b4a84326
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationOptimizeIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.avg;
+import static org.apache.iotdb.itbase.constant.TestConstant.count;
+import static org.apache.iotdb.itbase.constant.TestConstant.extreme;
+import static org.apache.iotdb.itbase.constant.TestConstant.firstValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.lastValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.maxTime;
+import static org.apache.iotdb.itbase.constant.TestConstant.maxValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.minTime;
+import static org.apache.iotdb.itbase.constant.TestConstant.minValue;
+import static org.apache.iotdb.itbase.constant.TestConstant.sum;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBAggregationOptimizeIT {
+  private static final String[] SQLs =
+      new String[] {
+        "CREATE DATABASE root.test",
+        "CREATE TIMESERIES root.test.1region_d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.1region_d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.1region_d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.1region_d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.2region_d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.2region_d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.2region_d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.2region_d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.test.1region_d1(timestamp,s1,s2) values(1, 1, 1)",
+        "INSERT INTO root.test.1region_d2(timestamp,s1,s2) values(1, 1, 1)",
+        "INSERT INTO root.test.1region_d1(timestamp,s1,s2) values(2, 2, 2)",
+        "INSERT INTO root.test.1region_d2(timestamp,s1,s2) values(2, 2, 2)",
+        "INSERT INTO root.test.2region_d1(timestamp,s1,s2) values(1, 1, 1)",
+        "INSERT INTO root.test.2region_d2(timestamp,s1,s2) values(1, 1, 1)",
+        "INSERT INTO root.test.2region_d1(timestamp,s1,s2) values(1000000000000, 1, 1)",
+        "INSERT INTO root.test.2region_d2(timestamp,s1,s2) values(1000000000000, 1, 1)",
+        "flush",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+    prepareData(SQLs);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void testEachSeriesOneRegion() {
+    String[] expectedHeader =
+        new String[] {
+          count("root.test.1region_d1.s1"),
+          count("root.test.1region_d2.s1"),
+          count("root.test.1region_d1.s2"),
+          count("root.test.1region_d2.s2")
+        };
+    String[] retArray = new String[] {"2,2,2,2,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(s1), count(s2) from root.test.1region_d1, root.test.1region_d2",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void testSomeSeriesOneRegion() {
+    String[] expectedHeader =
+        new String[] {count("root.test.1region_d1.s1"), count("root.test.2region_d2.s1")};
+    String[] retArray = new String[] {"2,2,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(s1) from root.test.1region_d1, root.test.2region_d2",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void testAggregationResultConcatOneRegion() {
+    String[] expectedHeader =
+        new String[] {count("root.test.2region_d1.s1"), count("root.test.2region_d1.s2")};
+    String[] retArray = new String[] {"2,2,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(s1),count(s2) from root.test.2region_d1", expectedHeader, retArray);
+  }
+
+  @Test
+  public void mixTest() {
+    String[] expectedHeader =
+        new String[] {
+          count("root.test.2region_d1.s1"),
+          count("root.test.2region_d2.s1"),
+          count("root.test.1region_d2.s1"),
+          count("root.test.1region_d1.s1"),
+          count("root.test.2region_d1.s2"),
+          count("root.test.2region_d2.s2"),
+          count("root.test.1region_d2.s2"),
+          count("root.test.1region_d1.s2")
+        };
+    String[] retArray = new String[] {"2,2,2,2,2,2,2,2,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(s1),count(s2) from root.test.**", expectedHeader, retArray);
+
+    expectedHeader =
+        new String[] {
+          TIMESTAMP_STR,
+          sum("root.test.2region_d1.s1"),
+          sum("root.test.2region_d2.s1"),
+          sum("root.test.1region_d2.s1"),
+          sum("root.test.1region_d1.s1"),
+          sum("root.test.2region_d1.s2"),
+          sum("root.test.2region_d2.s2"),
+          sum("root.test.1region_d2.s2"),
+          sum("root.test.1region_d1.s2")
+        };
+    retArray =
+        new String[] {
+          "1,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,", "2,null,null,2.0,2.0,null,null,2.0,2.0,"
+        };
+    resultSetEqualWithDescOrderTest(
+        "select sum(s1),sum(s2) from root.test.** group by ([1,3), 1ms)", expectedHeader, retArray);
+  }
+
+  @Test
+  public void testStaticAggregator() {
+    String[] expectedHeader =
+        new String[] {
+          count("root.test.1region_d1.s1"),
+          sum("root.test.1region_d1.s1"),
+          avg("root.test.1region_d1.s1"),
+          extreme("root.test.1region_d1.s1"),
+          maxValue("root.test.1region_d1.s1"),
+          minValue("root.test.1region_d1.s1"),
+          firstValue("root.test.1region_d1.s1"),
+          lastValue("root.test.1region_d1.s1"),
+          maxTime("root.test.1region_d1.s1"),
+          minTime("root.test.1region_d1.s1"),
+          count("root.test.2region_d1.s1")
+        };
+    String[] retArray = new String[] {"2,3.0,1.5,2,2,1,1,2,2,1,2,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(1region_d1.s1),sum(1region_d1.s1),avg(1region_d1.s1),"
+            + "extreme(1region_d1.s1),max_value(1region_d1.s1),min_value(1region_d1.s1),"
+            + "first_value(1region_d1.s1),last_value(1region_d1.s1),max_time(1region_d1.s1),"
+            + "min_time(1region_d1.s1),count(2region_d1.s1) from root.test",
+        expectedHeader,
+        retArray);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index c207ef1a52..eedbe08063 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -123,7 +123,25 @@ public class ExtremeAccumulator implements Accumulator {
       return;
     }
     initResult = true;
-    extremeResult.setObject(finalResult.getObject(0));
+    switch (seriesDataType) {
+      case INT32:
+        extremeResult.setInt(finalResult.getInt(0));
+        break;
+      case INT64:
+        extremeResult.setLong(finalResult.getLong(0));
+        break;
+      case FLOAT:
+        extremeResult.setFloat(finalResult.getFloat(0));
+        break;
+      case DOUBLE:
+        extremeResult.setDouble(finalResult.getDouble(0));
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", seriesDataType));
+    }
   }
 
   // columnBuilder should be single in ExtremeAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index c4a6ff3055..90ba07f543 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -130,8 +130,32 @@ public class FirstValueAccumulator implements Accumulator {
   @Override
   public void setFinal(Column finalResult) {
     reset();
-    hasCandidateResult = true;
-    firstValue.setObject(finalResult.getObject(0));
+    if (!finalResult.isNull(0)) {
+      hasCandidateResult = true;
+      switch (seriesDataType) {
+        case INT32:
+          firstValue.setInt(finalResult.getInt(0));
+          break;
+        case INT64:
+          firstValue.setLong(finalResult.getLong(0));
+          break;
+        case FLOAT:
+          firstValue.setFloat(finalResult.getFloat(0));
+          break;
+        case DOUBLE:
+          firstValue.setDouble(finalResult.getDouble(0));
+          break;
+        case TEXT:
+          firstValue.setBinary(finalResult.getBinary(0));
+          break;
+        case BOOLEAN:
+          firstValue.setBoolean(finalResult.getBoolean(0));
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+      }
+    }
   }
 
   // columnBuilder should be double in FirstValueAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index 61079a6394..e198948f49 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -130,7 +130,32 @@ public class LastValueAccumulator implements Accumulator {
   @Override
   public void setFinal(Column finalResult) {
     reset();
-    lastValue.setObject(finalResult.getObject(0));
+    if (!finalResult.isNull(0)) {
+      initResult = true;
+      switch (seriesDataType) {
+        case INT32:
+          lastValue.setInt(finalResult.getInt(0));
+          break;
+        case INT64:
+          lastValue.setLong(finalResult.getLong(0));
+          break;
+        case FLOAT:
+          lastValue.setFloat(finalResult.getFloat(0));
+          break;
+        case DOUBLE:
+          lastValue.setDouble(finalResult.getDouble(0));
+          break;
+        case TEXT:
+          lastValue.setBinary(finalResult.getBinary(0));
+          break;
+        case BOOLEAN:
+          lastValue.setBoolean(finalResult.getBoolean(0));
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Unsupported data type in LastValue: %s", seriesDataType));
+      }
+    }
   }
 
   // columnBuilder should be double in LastValueAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index 9380e70a59..768e9cf16f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -120,7 +120,26 @@ public class MaxValueAccumulator implements Accumulator {
     if (finalResult.isNull(0)) {
       return;
     }
-    maxResult.setObject(finalResult.getObject(0));
+    initResult = true;
+    switch (seriesDataType) {
+      case INT32:
+        maxResult.setInt(finalResult.getInt(0));
+        break;
+      case INT64:
+        maxResult.setLong(finalResult.getLong(0));
+        break;
+      case FLOAT:
+        maxResult.setFloat(finalResult.getFloat(0));
+        break;
+      case DOUBLE:
+        maxResult.setDouble(finalResult.getDouble(0));
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+    }
   }
 
   // columnBuilder should be single in countAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index 5d194a94c7..252ccaf02b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -82,6 +82,7 @@ public class MinTimeAccumulator implements Accumulator {
     if (finalResult.isNull(0)) {
       return;
     }
+    hasCandidateResult = true;
     minTime = finalResult.getLong(0);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 9f415930b4..2abd59e6e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -120,7 +120,26 @@ public class MinValueAccumulator implements Accumulator {
     if (finalResult.isNull(0)) {
       return;
     }
-    minResult.setObject(finalResult.getObject(0));
+    initResult = true;
+    switch (seriesDataType) {
+      case INT32:
+        minResult.setInt(finalResult.getInt(0));
+        break;
+      case INT64:
+        minResult.setLong(finalResult.getLong(0));
+        break;
+      case FLOAT:
+        minResult.setFloat(finalResult.getFloat(0));
+        break;
+      case DOUBLE:
+        minResult.setDouble(finalResult.getDouble(0));
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in MinValue: %s", seriesDataType));
+    }
   }
 
   // columnBuilder should be single in MinValueAccumulator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index 66d3064c44..bd113d7801 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -90,6 +90,7 @@ public class SumAccumulator implements Accumulator {
     if (finalResult.isNull(0)) {
       return;
     }
+    initResult = true;
     sumValue = finalResult.getDouble(0);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
new file mode 100644
index 0000000000..2a4ffd58c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class VerticallyConcatOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final List<Operator> children;
+
+  private final int inputOperatorsCount;
+
+  /** TsBlock from child operator. Only one cache now. */
+  private final TsBlock[] inputTsBlocks;
+
+  /** start index for each input TsBlocks and size of it is equal to inputTsBlocks */
+  private final int[] inputIndex;
+
+  private final int outputColumnCount;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private boolean finished;
+
+  public VerticallyConcatOperator(
+      OperatorContext operatorContext, List<Operator> children, List<TSDataType> dataTypes) {
+    checkArgument(
+        children != null && children.size() > 0,
+        "child size of VerticallyConcatOperator should be larger than 0");
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.inputTsBlocks = new TsBlock[this.inputOperatorsCount];
+    this.inputIndex = new int[this.inputOperatorsCount];
+    this.outputColumnCount = dataTypes.size();
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (empty(i)) {
+        ListenableFuture<?> blocked = children.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          listenableFutures.add(blocked);
+        }
+      }
+    }
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+  }
+
+  @Override
+  public TsBlock next() {
+    tsBlockBuilder.reset();
+    // indicates how many rows can be built in this calculate
+    int maxRowCanBuild = Integer.MAX_VALUE;
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (empty(i)) {
+        inputIndex[i] = 0;
+        inputTsBlocks[i] = children.get(i).next();
+        if (empty(i)) {
+          // child operator has not prepared TsBlock well
+          return null;
+        }
+      }
+      maxRowCanBuild =
+          Math.min(maxRowCanBuild, inputTsBlocks[i].getPositionCount() - inputIndex[i]);
+    }
+
+    TimeColumn firstTimeColumn = inputTsBlocks[0].getTimeColumn();
+    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+    // build TimeColumn according to the first inputTsBlock
+    int currTsBlockIndex = inputIndex[0];
+    for (int row = 0; row < maxRowCanBuild; row++) {
+      timeColumnBuilder.writeLong(firstTimeColumn.getLong(currTsBlockIndex + row));
+      tsBlockBuilder.declarePosition();
+    }
+
+    // build ValueColumns according to inputTsBlocks
+    int valueBuilderIndex = 0; // indicate which valueColumnBuilder should use
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      currTsBlockIndex = inputIndex[i];
+      for (Column column : inputTsBlocks[i].getValueColumns()) {
+        for (int row = 0; row < maxRowCanBuild; row++) {
+          if (column.isNull(currTsBlockIndex + row)) {
+            valueColumnBuilders[valueBuilderIndex].appendNull();
+          } else {
+            valueColumnBuilders[valueBuilderIndex].write(column, currTsBlockIndex + row);
+          }
+        }
+        valueBuilderIndex++;
+      }
+      inputIndex[i] += maxRowCanBuild;
+    }
+    return tsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    return !empty(0) || children.get(0).hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    if (finished) {
+      return true;
+    }
+    return finished = empty(0) && !children.get(0).hasNext();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+    }
+
+    maxPeekMemory += calculateMaxReturnSize();
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
+
+  /**
+   * If the tsBlock of tsBlockIndex is null or has no more data in the tsBlock, return true; else
+   * return false;
+   */
+  private boolean empty(int tsBlockIndex) {
+    return inputTsBlocks[tsBlockIndex] == null
+        || inputTsBlocks[tsBlockIndex].getPositionCount() == inputIndex[tsBlockIndex];
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e0438e2668..1a22949343 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPrevi
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.VerticallyConcatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
@@ -145,11 +146,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -1533,7 +1536,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 TimeJoinOperator.class.getSimpleName());
     TimeComparator timeComparator =
         node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
-    List<OutputColumn> outputColumns = generateOutputColumns(node);
+    List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node);
     List<ColumnMerger> mergers = createColumnMergers(outputColumns, timeComparator);
     List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
@@ -1547,7 +1550,27 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         timeComparator);
   }
 
-  private List<OutputColumn> generateOutputColumns(TimeJoinNode node) {
+  @Override
+  public Operator visitVerticallyConcat(
+      VerticallyConcatNode node, LocalExecutionPlanContext context) {
+    List<Operator> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(Collectors.toList());
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                VerticallyConcatOperator.class.getSimpleName());
+    List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new VerticallyConcatOperator(operatorContext, children, outputColumnTypes);
+  }
+
+  private List<OutputColumn> generateOutputColumnsFromChildren(MultiChildProcessNode node) {
     // TODO we should also sort the InputLocation for each column if they are not overlapped
     return makeLayout(node).values().stream()
         .map(inputLocations -> new OutputColumn(inputLocations, inputLocations.size() > 1))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 7a1a186861..a9590d995a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessN
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -252,10 +253,16 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processOneChildNode(node, context);
   }
 
+  @Override
   public PlanNode visitGroupByTag(GroupByTagNode node, NodeGroupContext context) {
     return processMultiChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitVerticallyConcat(VerticallyConcatNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
   private PlanNode processDeviceViewWithAggregation(DeviceViewNode node, NodeGroupContext context) {
     // group all the children by DataRegion distribution
     Map<TRegionReplicaSet, DeviceViewGroup> deviceViewGroupMap = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 8224d931a4..85e331ad22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -597,61 +598,94 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, DistributionPlanContext context) {
-
-    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
-    Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
-        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
-    // construct AggregationDescriptor for AggregationNode
-    List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
-    for (PlanNode child : root.getChildren()) {
-      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
-      handle
-          .getAggregationDescriptorList()
-          .forEach(
-              descriptor -> {
-                rootAggDescriptorList.add(
-                    new AggregationDescriptor(
-                        descriptor.getAggregationFuncName(),
-                        context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE,
-                        descriptor.getInputExpressions()));
-              });
+    Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup;
+
+    // construct newRoot
+    MultiChildProcessNode newRoot;
+    if (context.isRoot) {
+      // This node is the root of PlanTree,
+      // if the aggregated series have only one region,
+      // upstream will give the final aggregate result,
+      // the step of this series' aggregator will be `STATIC`
+      List<SeriesAggregationSourceNode> sources = new ArrayList<>();
+      Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
+      boolean[] eachSeriesOneRegion = {true};
+      sourceGroup =
+          splitAggregationSourceByPartition(
+              root, context, sources, eachSeriesOneRegion, regionCountPerSeries);
+
+      if (eachSeriesOneRegion[0]) {
+        newRoot = new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+      } else {
+        List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+        for (PlanNode child : root.getChildren()) {
+          SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+          handle
+              .getAggregationDescriptorList()
+              .forEach(
+                  descriptor ->
+                      rootAggDescriptorList.add(
+                          new AggregationDescriptor(
+                              descriptor.getAggregationFuncName(),
+                              regionCountPerSeries.get(handle.getPartitionPath()) == 1
+                                  ? AggregationStep.STATIC
+                                  : AggregationStep.FINAL,
+                              descriptor.getInputExpressions())));
+        }
+        SeriesAggregationSourceNode seed = (SeriesAggregationSourceNode) root.getChildren().get(0);
+        newRoot =
+            new AggregationNode(
+                context.queryContext.getQueryId().genPlanNodeId(),
+                rootAggDescriptorList,
+                seed.getGroupByTimeParameter(),
+                seed.getScanOrder());
+      }
+    } else {
+      // If this node is not the root node of PlanTree,
+      // it declares that there have something to do at downstream,
+      // the step of this AggregationNode should be `INTERMEDIATE`
+      sourceGroup = splitAggregationSourceByPartition(root, context);
+      List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+      for (PlanNode child : root.getChildren()) {
+        SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+        handle
+            .getAggregationDescriptorList()
+            .forEach(
+                descriptor ->
+                    rootAggDescriptorList.add(
+                        new AggregationDescriptor(
+                            descriptor.getAggregationFuncName(),
+                            AggregationStep.INTERMEDIATE,
+                            descriptor.getInputExpressions())));
+      }
+      SeriesAggregationSourceNode seed = (SeriesAggregationSourceNode) root.getChildren().get(0);
+      newRoot =
+          new AggregationNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              rootAggDescriptorList,
+              seed.getGroupByTimeParameter(),
+              seed.getScanOrder());
     }
-    rootAggDescriptorList.forEach(
-        d ->
-            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                d, context.queryContext.getTypeProvider()));
-    checkArgument(
-        sources.size() > 0, "Aggregation sources should not be empty when distribution planning");
-    SeriesAggregationSourceNode seed = sources.get(0);
-    AggregationNode aggregationNode =
-        new AggregationNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            rootAggDescriptorList,
-            seed.getGroupByTimeParameter(),
-            seed.getScanOrder());
 
-    final boolean[] addParent = {false};
+    boolean[] addParent = {false};
     sourceGroup.forEach(
         (dataRegion, sourceNodes) -> {
           if (sourceNodes.size() == 1) {
-            aggregationNode.addChild(sourceNodes.get(0));
+            newRoot.addChild(sourceNodes.get(0));
           } else {
             if (!addParent[0]) {
-              sourceNodes.forEach(aggregationNode::addChild);
+              sourceNodes.forEach(newRoot::addChild);
               addParent[0] = true;
             } else {
-              // We clone a TimeJoinNode from root to make the params to be consistent.
-              // But we need to assign a new ID to it
-              TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
-              parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+              VerticallyConcatNode parentOfGroup =
+                  new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
               sourceNodes.forEach(parentOfGroup::addChild);
-              aggregationNode.addChild(parentOfGroup);
+              newRoot.addChild(parentOfGroup);
             }
           }
         });
 
-    return aggregationNode;
+    return newRoot;
   }
 
   @Override
@@ -660,9 +694,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       return defaultRewrite(root, context);
     }
     // Firstly, we build the tree structure for GroupByLevelNode
-    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
     Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
-        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+        splitAggregationSourceByPartition(root, context);
 
     boolean containsSlidingWindow =
         root.getChildren().size() == 1
@@ -684,9 +717,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
 
   @Override
   public PlanNode visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
-    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
     Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
-        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+        splitAggregationSourceByPartition(root, context);
 
     boolean containsSlidingWindow =
         root.getChildren().size() == 1
@@ -730,11 +762,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
           if (sourceNodes.size() == 1) {
             parentOfGroup.addChild(sourceNodes.get(0));
           } else {
-            TimeJoinNode timeJoinNode =
-                new TimeJoinNode(
-                    context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
-            sourceNodes.forEach(timeJoinNode::addChild);
-            parentOfGroup.addChild(timeJoinNode);
+            VerticallyConcatNode verticallyConcatNode =
+                new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+            sourceNodes.forEach(verticallyConcatNode::addChild);
+            parentOfGroup.addChild(verticallyConcatNode);
           }
           groups.add(parentOfGroup);
         });
@@ -871,11 +902,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
           if (sourceNodes.size() == 1) {
             parentOfGroup.addChild(sourceNodes.get(0));
           } else {
-            PlanNode timeJoinNode =
-                new TimeJoinNode(
-                    context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
-            sourceNodes.forEach(timeJoinNode::addChild);
-            parentOfGroup.addChild(timeJoinNode);
+            VerticallyConcatNode verticallyConcatNode =
+                new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+            sourceNodes.forEach(verticallyConcatNode::addChild);
+            parentOfGroup.addChild(verticallyConcatNode);
           }
           newRoot.addChild(parentOfGroup);
         });
@@ -897,11 +927,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
               sourceNodes.forEach(newRoot::addChild);
               addParent[0] = true;
             } else {
-              PlanNode timeJoinNode =
-                  new TimeJoinNode(
-                      context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
-              sourceNodes.forEach(timeJoinNode::addChild);
-              newRoot.addChild(timeJoinNode);
+              VerticallyConcatNode verticallyConcatNode =
+                  new VerticallyConcatNode(context.queryContext.getQueryId().genPlanNodeId());
+              sourceNodes.forEach(verticallyConcatNode::addChild);
+              newRoot.addChild(verticallyConcatNode);
             }
           }
         });
@@ -916,45 +945,96 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return columnName.contains(expression.getExpressionString());
   }
 
-  private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
-      PlanNode root, DistributionPlanContext context) {
+  /**
+   * This method is used for rewriting second step AggregationNode like GroupByLevelNode,
+   * GroupByTagNode and SlidingWindowAggregationNode, so the first step AggregationNode's step will
+   * be {@link AggregationStep#PARTIAL}
+   */
+  private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
+      splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext context) {
     // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
     List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
-    // Step 1: split SeriesAggregationSourceNode according to data partition
+
+    // Step 1: construct SeriesAggregationSourceNode for each data region of one Path
     List<SeriesAggregationSourceNode> sources = new ArrayList<>();
-    Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
     for (SeriesAggregationSourceNode child : rawSources) {
-      List<TRegionReplicaSet> dataDistribution =
-          analysis.getPartitionInfo(child.getPartitionPath(), child.getPartitionTimeFilter());
-      for (TRegionReplicaSet dataRegion : dataDistribution) {
-        SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) child.clone();
-        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-        split.setRegionReplicaSet(dataRegion);
-        // Let each split reference different object of AggregationDescriptorList
-        split.setAggregationDescriptorList(
-            child.getAggregationDescriptorList().stream()
-                .map(AggregationDescriptor::deepClone)
-                .collect(Collectors.toList()));
-        sources.add(split);
-      }
-      regionCountPerSeries.put(child.getPartitionPath(), dataDistribution.size());
+      constructAggregationSourceNodeForPerRegion(context, sources, child);
     }
 
-    // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
+    // Step 2: change step to PARTIAL for each SeriesAggregationSourceNode and update TypeProvider
     for (SeriesAggregationSourceNode source : sources) {
-      //        boolean isFinal = regionCountPerSeries.get(source.getPartitionPath()) == 1;
-      // TODO: (xingtanzjr) need to optimize this step later. We make it as Partial now.
-      boolean isFinal = false;
       source
           .getAggregationDescriptorList()
           .forEach(
               d -> {
-                d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL);
+                d.setStep(AggregationStep.PARTIAL);
                 LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
                     d, context.queryContext.getTypeProvider());
               });
     }
-    return sources;
+
+    // Step 3: group all SeriesAggregationSourceNode by each region
+    return sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+  }
+
+  private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
+      splitAggregationSourceByPartition(
+          PlanNode root,
+          DistributionPlanContext context,
+          List<SeriesAggregationSourceNode> sources,
+          boolean[] eachSeriesOneRegion,
+          Map<PartialPath, Integer> regionCountPerSeries) {
+    // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
+    List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
+
+    // Step 1: construct SeriesAggregationSourceNode for each data region of one Path
+    for (SeriesAggregationSourceNode child : rawSources) {
+      regionCountPerSeries.put(
+          child.getPartitionPath(),
+          constructAggregationSourceNodeForPerRegion(context, sources, child));
+    }
+
+    // Step 2: change step to SINGLE or PARTIAL for each SeriesAggregationSourceNode and update
+    // TypeProvider
+    for (SeriesAggregationSourceNode source : sources) {
+      boolean isSingle = regionCountPerSeries.get(source.getPartitionPath()) == 1;
+      source
+          .getAggregationDescriptorList()
+          .forEach(
+              d -> {
+                if (isSingle) {
+                  d.setStep(AggregationStep.SINGLE);
+                } else {
+                  eachSeriesOneRegion[0] = false;
+                  d.setStep(AggregationStep.PARTIAL);
+                  LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                      d, context.queryContext.getTypeProvider());
+                }
+              });
+    }
+
+    // Step 3: group all SeriesAggregationSourceNode by each region
+    return sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+  }
+
+  private int constructAggregationSourceNodeForPerRegion(
+      DistributionPlanContext context,
+      List<SeriesAggregationSourceNode> sources,
+      SeriesAggregationSourceNode child) {
+    List<TRegionReplicaSet> dataDistribution =
+        analysis.getPartitionInfo(child.getPartitionPath(), child.getPartitionTimeFilter());
+    for (TRegionReplicaSet dataRegion : dataDistribution) {
+      SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) child.clone();
+      split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+      split.setRegionReplicaSet(dataRegion);
+      // Let each split reference different object of AggregationDescriptorList
+      split.setAggregationDescriptorList(
+          child.getAggregationDescriptorList().stream()
+              .map(AggregationDescriptor::deepClone)
+              .collect(Collectors.toList()));
+      sources.add(split);
+    }
+    return dataDistribution.size();
   }
 
   private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index c929512dae..1ea9a94440 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -394,6 +395,13 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitVerticallyConcat(VerticallyConcatNode node, GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("VerticallyConcat-%s", node.getPlanNodeId().getId()));
+    return render(node, boxValue, context);
+  }
+
   private String printRegion(TRegionReplicaSet regionReplicaSet) {
     return String.format(
         "Partition: %s",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index bad8725978..0f846ad25b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -150,7 +151,8 @@ public enum PlanNodeType {
   ROLLBACK_PRE_DEACTIVATE_TEMPLATE_NODE((short) 60),
   DEACTIVATE_TEMPLATE_NODE((short) 61),
   INTO((short) 62),
-  DEVICE_VIEW_INTO((short) 63);
+  DEVICE_VIEW_INTO((short) 63),
+  VERTICALLY_CONCAT((short) 64);
 
   public static final int BYTES = Short.BYTES;
 
@@ -327,6 +329,8 @@ public enum PlanNodeType {
         return IntoNode.deserialize(buffer);
       case 63:
         return DeviceViewIntoNode.deserialize(buffer);
+      case 64:
+        return VerticallyConcatNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 788649ffc8..8bf41c86ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -326,4 +327,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
new file mode 100644
index 0000000000..24760a4228
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * This node is responsible for merge two or more TsBlocks have been aligned. TsBlocks must have
+ * same time column or no time column at all, and we can merge the value column directly without
+ * compare.
+ */
+public class VerticallyConcatNode extends MultiChildProcessNode {
+
+  public VerticallyConcatNode(PlanNodeId id) {
+    super(id, new ArrayList<>());
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new VerticallyConcatNode(getPlanNodeId());
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return children.stream()
+        .map(PlanNode::getOutputColumnNames)
+        .flatMap(List::stream)
+        .distinct()
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitVerticallyConcat(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.VERTICALLY_CONCAT.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.VERTICALLY_CONCAT.serialize(stream);
+  }
+
+  public static VerticallyConcatNode deserialize(ByteBuffer byteBuffer) {
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new VerticallyConcatNode(planNodeId);
+  }
+
+  @Override
+  public String toString() {
+    return "VerticallyConcatNode-" + this.getPlanNodeId();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    VerticallyConcatNode that = (VerticallyConcatNode) o;
+    return children.equals(that.children);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), children);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java
index 9e7c3e701a..04cab22a27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationStep.java
@@ -39,15 +39,15 @@ import java.nio.ByteBuffer;
 public enum AggregationStep {
 
   // input Raw, output Partial
-  PARTIAL(InputType.RAW, true),
+  PARTIAL(InputType.RAW, true, (byte) 0),
   // input Partial, output Final
-  FINAL(InputType.PARTIAL, false),
+  FINAL(InputType.PARTIAL, false, (byte) 1),
   // input Partial, output Partial
-  INTERMEDIATE(InputType.PARTIAL, true),
+  INTERMEDIATE(InputType.PARTIAL, true, (byte) 2),
   // input Raw, output Final
-  SINGLE(InputType.RAW, false),
+  SINGLE(InputType.RAW, false, (byte) 3),
   // input final, output final
-  STATIC(InputType.FINAL, false);
+  STATIC(InputType.FINAL, false, (byte) 4);
 
   private enum InputType {
     RAW,
@@ -57,10 +57,12 @@ public enum AggregationStep {
 
   private final InputType inputType;
   private final boolean outputPartial;
+  private final byte ordinal;
 
-  AggregationStep(InputType inputType, boolean outputPartial) {
+  AggregationStep(InputType inputType, boolean outputPartial, byte ordinal) {
     this.inputType = inputType;
     this.outputPartial = outputPartial;
+    this.ordinal = ordinal;
   }
 
   public boolean isInputRaw() {
@@ -80,27 +82,28 @@ public enum AggregationStep {
   }
 
   public void serialize(ByteBuffer byteBuffer) {
-    ReadWriteIOUtils.write(isInputRaw(), byteBuffer);
-    ReadWriteIOUtils.write(isOutputPartial(), byteBuffer);
+    ReadWriteIOUtils.write(ordinal, byteBuffer);
   }
 
   public void serialize(DataOutputStream stream) throws IOException {
-    ReadWriteIOUtils.write(isInputRaw(), stream);
-    ReadWriteIOUtils.write(isOutputPartial(), stream);
+    ReadWriteIOUtils.write(ordinal, stream);
   }
 
   public static AggregationStep deserialize(ByteBuffer byteBuffer) {
-    boolean isInputRaw = ReadWriteIOUtils.readBool(byteBuffer);
-    boolean isOutputPartial = ReadWriteIOUtils.readBool(byteBuffer);
-    if (isInputRaw && isOutputPartial) {
-      return AggregationStep.PARTIAL;
+    byte type = ReadWriteIOUtils.readByte(byteBuffer);
+    switch (type) {
+      case 0:
+        return AggregationStep.PARTIAL;
+      case 1:
+        return AggregationStep.FINAL;
+      case 2:
+        return AggregationStep.INTERMEDIATE;
+      case 3:
+        return AggregationStep.SINGLE;
+      case 4:
+        return AggregationStep.STATIC;
+      default:
+        throw new IllegalArgumentException("Invalid AggregationStep type: " + type);
     }
-    if (!isInputRaw && isOutputPartial) {
-      return AggregationStep.INTERMEDIATE;
-    }
-    if (isInputRaw) {
-      return AggregationStep.SINGLE;
-    }
-    return AggregationStep.FINAL;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/VerticallyConcatOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/VerticallyConcatOperatorTest.java
new file mode 100644
index 0000000000..988b95a172
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/VerticallyConcatOperatorTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.VerticallyConcatOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class VerticallyConcatOperatorTest {
+  private static final String VERTICALLY_CONCAT_OPERATOR_TEST_SG =
+      "root.VerticallyConcatOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas,
+        deviceIds,
+        seqResources,
+        unSeqResources,
+        VERTICALLY_CONCAT_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void batchTest1() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesAggregationScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          3, new PlanNodeId("3"), VerticallyConcatOperator.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(
+              VERTICALLY_CONCAT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      List<AggregationType> aggregationTypes =
+          Arrays.asList(AggregationType.COUNT, AggregationType.SUM, AggregationType.FIRST_VALUE);
+      GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 10, 1, 1, true);
+      List<Aggregator> aggregators = new ArrayList<>();
+      AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+          .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          new SeriesAggregationScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              aggregators,
+              initTimeRangeIterator(groupByTimeParameter, true, true),
+              null,
+              true,
+              groupByTimeParameter,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+      seriesAggregationScanOperator1.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+      seriesAggregationScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(
+              VERTICALLY_CONCAT_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          new SeriesAggregationScanOperator(
+              planNodeId2,
+              measurementPath2,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              aggregators,
+              initTimeRangeIterator(groupByTimeParameter, true, true),
+              null,
+              true,
+              groupByTimeParameter,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+      seriesAggregationScanOperator2.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+      seriesAggregationScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      VerticallyConcatOperator verticallyConcatOperator =
+          new VerticallyConcatOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Arrays.asList(seriesAggregationScanOperator1, seriesAggregationScanOperator2),
+              Arrays.asList(
+                  TSDataType.INT64,
+                  TSDataType.DOUBLE,
+                  TSDataType.INT32,
+                  TSDataType.INT64,
+                  TSDataType.DOUBLE,
+                  TSDataType.INT32));
+
+      int count = 0;
+      while (verticallyConcatOperator.hasNext()) {
+        TsBlock tsBlock = verticallyConcatOperator.next();
+        assertEquals(6, tsBlock.getValueColumnCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+          assertEquals(count, tsBlock.getTimeByIndex(i));
+          assertEquals(1, tsBlock.getColumn(0).getLong(i));
+          assertEquals(20000 + count, tsBlock.getColumn(1).getDouble(i), 0.00001);
+          assertEquals(20000 + count, tsBlock.getColumn(2).getInt(i));
+          assertEquals(1, tsBlock.getColumn(3).getLong(i));
+          assertEquals(20000 + count, tsBlock.getColumn(4).getDouble(i), 0.00001);
+          assertEquals(20000 + count, tsBlock.getColumn(5).getInt(i));
+        }
+      }
+      assertEquals(10, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 8074203fc2..991cebb099 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -144,14 +145,19 @@ public class AggregationDistributionTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(3, plan.getInstances().size());
+
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    List<AggregationStep> expected = Arrays.asList(AggregationStep.STATIC, AggregationStep.FINAL);
+    verifyAggregationStep(expected, fragmentInstances.get(0).getFragment().getPlanNodeTree());
+
     Map<String, AggregationStep> expectedStep = new HashMap<>();
     expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    expectedStep.put(d2s1Path, AggregationStep.SINGLE);
     fragmentInstances.forEach(
         f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
   }
 
+  // verify SeriesAggregationSourceNode
   private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
     if (root == null) {
       return;
@@ -167,6 +173,22 @@ public class AggregationDistributionTest {
     root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
   }
 
+  // verify AggregationNode
+  private void verifyAggregationStep(List<AggregationStep> expected, PlanNode root) {
+    if (root == null) {
+      return;
+    }
+    if (root instanceof AggregationNode) {
+      List<AggregationStep> actual =
+          ((AggregationNode) root)
+              .getAggregationDescriptorList().stream()
+                  .map(AggregationDescriptor::getStep)
+                  .collect(Collectors.toList());
+      assertEquals(expected, actual);
+    }
+    root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
+  }
+
   @Test
   public void testTimeJoinAggregationWithSlidingWindow() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
@@ -391,10 +413,15 @@ public class AggregationDistributionTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(2, plan.getInstances().size());
+
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    List<AggregationStep> expected = Arrays.asList(AggregationStep.FINAL, AggregationStep.FINAL);
+    verifyAggregationStep(expected, fragmentInstances.get(0).getFragment().getPlanNodeTree());
+
     Map<String, AggregationStep> expectedStep = new HashMap<>();
     expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
+
     fragmentInstances.forEach(
         f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
 
@@ -707,7 +734,7 @@ public class AggregationDistributionTest {
     PlanNode f2Root =
         plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
     assertTrue(f1Root instanceof DeviceViewNode);
-    assertTrue(f2Root instanceof TimeJoinNode);
+    assertTrue(f2Root instanceof VerticallyConcatNode);
     assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
     assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
   }
@@ -731,9 +758,9 @@ public class AggregationDistributionTest {
     PlanNode f3Root =
         plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
     assertTrue(f1Root instanceof DeviceMergeNode);
-    assertTrue(f2Root instanceof TimeJoinNode);
+    assertTrue(f2Root instanceof VerticallyConcatNode);
     assertTrue(f3Root instanceof DeviceViewNode);
-    assertTrue(f3Root.getChildren().get(0) instanceof AggregationNode);
+    assertTrue(f3Root.getChildren().get(0) instanceof VerticallyConcatNode);
     assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
     assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof AggregationNode);
     assertEquals(3, f1Root.getChildren().get(0).getChildren().get(0).getChildren().size());
@@ -756,7 +783,7 @@ public class AggregationDistributionTest {
     PlanNode f2Root =
         plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
     assertTrue(f1Root instanceof DeviceViewNode);
-    assertTrue(f2Root instanceof TimeJoinNode);
+    assertTrue(f2Root instanceof VerticallyConcatNode);
     assertEquals(2, f1Root.getChildren().size());
   }
 
@@ -832,4 +859,33 @@ public class AggregationDistributionTest {
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(3, plan.getInstances().size());
   }
+
+  @Test
+  public void testEachSeriesOneRegion() {
+    QueryId queryId = new QueryId("test_each_series_1_region");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1), count(s2) from root.sg.d22, root.sg.d55555";
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        fragmentInstance ->
+            assertTrue(
+                fragmentInstance.getFragment().getPlanNodeTree().getChildren().get(0)
+                    instanceof VerticallyConcatNode));
+
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put("root.sg.d22.s1", AggregationStep.SINGLE);
+    expectedStep.put("root.sg.d22.s2", AggregationStep.SINGLE);
+    expectedStep.put("root.sg.d55555.s1", AggregationStep.SINGLE);
+    expectedStep.put("root.sg.d55555.s2", AggregationStep.SINGLE);
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+  }
 }