You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/17 01:58:16 UTC

[iotdb] branch ShuffleFeature updated: Shuffle sink

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ShuffleFeature
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ShuffleFeature by this push:
     new ea47360f53 Shuffle sink
ea47360f53 is described below

commit ea47360f5379f9b90a8a8b30a58d65174c2a556a
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 17 09:58:10 2023 +0800

    Shuffle sink
---
 README.md                                          |   2 +-
 docs/UserGuide/API/Programming-Java-Native-API.md  |   2 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   2 +-
 .../UserGuide/API/Programming-Java-Native-API.md   |   2 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   2 +-
 .../iotdb/db/it/query/IoTDBShuffleSink1IT.java     | 150 +++++++++++
 .../iotdb/db/it/query/IoTDBShuffleSink2IT.java     | 154 +++++++++++
 .../iotdb/db/mpp/execution/driver/Driver.java      |   2 +-
 .../db/mpp/execution/driver/DriverContext.java     |   2 +-
 .../iotdb/db/mpp/execution/driver/IDriver.java     |   2 +-
 .../exchange/IMPPDataExchangeManager.java          |  32 ++-
 .../execution/exchange/MPPDataExchangeManager.java | 291 ++++++++++++--------
 .../mpp/execution/exchange/SharedTsBlockQueue.java |   6 +
 .../exchange/sink/DownStreamChannelIndex.java      |  37 +++
 .../exchange/sink/DownStreamChannelLocation.java   |  93 +++++++
 .../mpp/execution/exchange/sink/ISinkChannel.java  |  31 +++
 .../execution/exchange/{ => sink}/ISinkHandle.java |  40 +--
 .../exchange/{ => sink}/LocalSinkHandle.java       |  52 ++--
 .../execution/exchange/sink/ShuffleSinkHandle.java | 299 +++++++++++++++++++++
 .../execution/exchange/{ => sink}/SinkHandle.java  | 119 ++++----
 .../exchange/{ => source}/ISourceHandle.java       |   2 +-
 .../exchange/{ => source}/LocalSourceHandle.java   |   3 +-
 .../exchange/{ => source}/SourceHandle.java        |  22 +-
 .../fragment/FragmentInstanceExecution.java        |   2 +-
 .../fragment/FragmentInstanceManager.java          |   2 +-
 .../operator/sink/IdentitySinkOperator.java        | 132 +++++++++
 .../operator/sink/ShuffleHelperOperator.java       | 134 +++++++++
 .../operator/source/ExchangeOperator.java          |   2 +-
 .../db/mpp/execution/schedule/task/DriverTask.java |   2 +-
 .../db/mpp/plan/execution/QueryExecution.java      |   4 +-
 .../plan/execution/memory/MemorySourceHandle.java  |   2 +-
 .../plan/planner/LocalExecutionPlanContext.java    |   2 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  82 +++++-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  10 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  10 +
 .../planner/plan/node/process/ExchangeNode.java    |  15 ++
 .../planner/plan/node/sink/IdentitySinkNode.java   |  85 ++++++
 .../plan/node/sink/MultiChildrenSinkNode.java      | 110 ++++++++
 .../planner/plan/node/sink/ShuffleSinkNode.java    |  91 +++++++
 .../execution/exchange/LocalSinkHandleTest.java    |   2 +
 .../execution/exchange/LocalSourceHandleTest.java  |   1 +
 .../exchange/MPPDataExchangeManagerTest.java       |   8 +-
 .../db/mpp/execution/exchange/SinkHandleTest.java  |   1 +
 .../mpp/execution/exchange/SourceHandleTest.java   |   6 +
 .../db/mpp/execution/exchange/StubSinkHandle.java  |   6 +-
 .../plan/node/sink/IdentitySinkNodeSerdeTest.java  |  60 +++++
 .../plan/node/sink/ShuffleSinkNodeSerdeTest.java   |  59 ++++
 thrift/src/main/thrift/datanode.thrift             |   4 +
 48 files changed, 1938 insertions(+), 241 deletions(-)

diff --git a/README.md b/README.md
index 78409964fa..acf9f04792 100644
--- a/README.md
+++ b/README.md
@@ -272,7 +272,7 @@ IoTDB> SHOW DATABASES
 Total line number = 1
 ```
 
-After the database is set, we can use CREATE TIMESERIES to create a new timeseries. When creating a timeseries, we should define its data type and the encoding scheme. Here We create two timeseries:
+After the database is set, we can use CREATE TIMESERIES to create a new timeseries. When creating a timeseries, we should define its data type and the encoding scheme. Here we create two timeseries:
 
 ```
 IoTDB> CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN
diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md
index bb8871f514..817223155d 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -47,7 +47,7 @@ In root directory:
 
 ## Syntax Convention
 
-- **IoTDB-SQL interface:** The input SQL parameter needs to conform to the [syntax conventions](../Reference/Syntax-Conventions.md) and be escaped for JAVA strings. For example, you need to add a backslash before the double-quotes. (That is: after JAVA escaping, it is consistent with the SQL statement executed on the command line.)
+- **IoTDB-SQL interface:** The input SQL parameter needs to conform to the [syntax conventions](../Syntax-Conventions/Literal-Values.md) and be escaped for JAVA strings. For example, you need to add a backslash before the double-quotes. (That is: after JAVA escaping, it is consistent with the SQL statement executed on the command line.)
 - **Other interfaces:**
   - The node names in path or path prefix as parameter: The node names which should be escaped by backticks (`) in the SQL statement,  escaping is required here.
   - Identifiers (such as template names) as parameters: The identifiers which should be escaped by backticks (`) in the SQL statement, and escaping is not required here.
diff --git a/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md b/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md
index 393630fd03..5fc169cbda 100644
--- a/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md
+++ b/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md
@@ -85,7 +85,7 @@ The following are the constraints on the `nodeName`:
   * [ 0-9 a-z A-Z _ ] (letters, numbers, underscore)
   * ['\u2E80'..'\u9FFF'] (Chinese characters)
 * In particular, if the system is deployed on a Windows machine, the database layer name will be case-insensitive. For example, creating both `root.ln` and `root.LN` at the same time is not allowed.
-* If you want to use special characters in `nodeName`, you can quote it with back quote, detailed information can be found here: [Syntax-Conventions](https://iotdb.apache.org/UserGuide/Master/Reference/Syntax-Conventions.html).
+* If you want to use special characters in `nodeName`, you can quote it with back quote, detailed information can be found from charpter Syntax-Conventions,click here: [Syntax-Conventions](https://iotdb.apache.org/UserGuide/Master/Syntax-Conventions/Literal-Values.html).
 
 ### Path Pattern
 
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index ed79f577b1..f6e8004562 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -50,7 +50,7 @@ mvn clean install -pl session -am -Dmaven.test.skip=true
 
 ## 语法说明
 
- - 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 [语法规范](../Reference/Syntax-Conventions.md) ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。) 
+ - 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 [语法规范](../Syntax-Conventions/Literal-Values.md) ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。) 
  - 对于其他接口: 
    - 经参数传入的路径或路径前缀中的节点: 在 SQL 语句中需要使用反引号(`)进行转义的,此处均需要进行转义。 
    - 经参数传入的标识符(如模板名):在 SQL 语句中需要使用反引号(`)进行转义的,均可以不用进行转义。
diff --git a/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md b/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md
index e559251312..59f313ad1b 100644
--- a/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md
+++ b/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md
@@ -88,7 +88,7 @@ wildcard
   * [ 0-9 a-z A-Z _ ] (字母,数字,下划线)
   * ['\u2E80'..'\u9FFF'] (UNICODE 中文字符)
 * 特别地,如果系统在 Windows 系统上部署,那么 database 路径结点名是大小写不敏感的。例如,同时创建`root.ln` 和 `root.LN` 是不被允许的。
-* 如果需要在路径结点名中用特殊字符,可以用反引号引用路径结点名,具体使用方法可以参考[语法约定](../Reference/Syntax-Conventions.md)。
+* 如果需要在路径结点名中用特殊字符,可以用反引号引用路径结点名,具体使用方法可以参考[语法约定](../Syntax-Conventions/Literal-Values.md)。
 
 ### 路径模式(Path Pattern)
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink1IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink1IT.java
new file mode 100644
index 0000000000..aff8691039
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink1IT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.query;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBShuffleSink1IT {
+  private static final String[] SINGLE_SERIES =
+      new String[] {
+        "create database root.single",
+        "insert into root.single.d1(time,s1,s2) values (1,2,2)",
+        "insert into root.single.d1(time,s1,s2) values (now(),3,3)",
+        "insert into root.single.d2(time,s1,s2) values (1,4,4)",
+        "insert into root.single.d2(time,s1,s2) values (now(),5,5)"
+      };
+  // two devices
+  private static final String[] MULTI_SERIES =
+      new String[] {
+        "create database root.sg",
+        "insert into root.sg.d1(time,s1,s2) values (1,2,2)",
+        "insert into root.sg.d1(time,s1,s2) values (now(),3,3)",
+        "insert into root.sg.d2(time,s1,s2) values (1,4,4)",
+        "insert into root.sg.d2(time,s1,s2) values (now(),5,5)"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
+    EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(2);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SINGLE_SERIES);
+    prepareData(MULTI_SERIES);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByDeviceWithoutValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2"};
+
+    resultSetEqualTest(
+        "select count(s1) from root.single.** align by device", expectedHeader1, retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** align by device", expectedHeader2, retArray2);
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByDeviceWithValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,1"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.single.** where s1 <= 4 align by device",
+        expectedHeader1,
+        retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,1,1"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** where s1 <= 4 align by device",
+        expectedHeader2,
+        retArray2);
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByTimeWithoutValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.single.** order by time align by device",
+        expectedHeader1,
+        retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** order by time align by device",
+        expectedHeader2,
+        retArray2);
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByTimeWithValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,1"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.single.** where s1 <= 4 order by time align by device",
+        expectedHeader1,
+        retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,1,1"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** where s1 <= 4 order by time align by device",
+        expectedHeader2,
+        retArray2);
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink2IT.java
new file mode 100644
index 0000000000..3be25557f4
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink2IT.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.query;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBShuffleSink2IT {
+  private static final String[] SINGLE_SERIES =
+      new String[] {
+        "create database root.single",
+        "insert into root.single.d1(time,s1) values (1,1,1)",
+        "insert into root.single.d1(time,s1) values (now(),2,2)",
+        "insert into root.single.d2(time,s1) values (now(),3,3)",
+        "insert into root.single.d2(time,s1) values (1,4,4)",
+        "insert into root.single.d3(time,s1) values (now(),5,5)",
+        "insert into root.single.d3(time,s1) values (1,6,6)"
+      };
+  // three devices, three data regions
+  private static final String[] MULTI_SERIES =
+      new String[] {
+        "create database root.sg",
+        "insert into root.sg.d1(time,s1,s2) values (1,1,1)",
+        "insert into root.sg.d1(time,s1,s2) values (now(),2,2)",
+        "insert into root.sg.d2(time,s1,s2) values (now(),3,3)",
+        "insert into root.sg.d2(time,s1,s2) values (1,4,4)",
+        "insert into root.sg.d3(time,s1,s2) values (now(),5,5)",
+        "insert into root.sg.d3(time,s1,s2) values (1,6,6)"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
+    EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(3);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SINGLE_SERIES);
+    prepareData(MULTI_SERIES);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByDeviceWithoutValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2", "root.single.d3,2"};
+
+    resultSetEqualTest(
+        "select count(s1) from root.single.** align by device", expectedHeader1, retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2", "root.sg.d3,2,2"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** align by device", expectedHeader2, retArray2);
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByDeviceWithValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2", "root.single.d3,0"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.single.** where s1 <= 4 align by device",
+        expectedHeader1,
+        retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2", "root.sg.d3,0,0"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** where s1 <= 4 align by device",
+        expectedHeader2,
+        retArray2);
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByTimeWithoutValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2", "root.single.d3,2"};
+
+    resultSetEqualTest(
+        "select count(s1) from root.single.** order by time align by device",
+        expectedHeader1,
+        retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2", "root.sg.d3,2,2"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.sg.** order by time align by device",
+        expectedHeader2,
+        retArray2);
+  }
+
+  @Test
+  public void testCountAlignByDeviceOrderByTimeWithValueFilter() {
+    // result of SINGLE_SERIES
+    String expectedHeader1 = "Device,count(s1)";
+    String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,1", "root.single.d3,0"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.single.** where s1 <= 4 order by time align by device",
+        expectedHeader1,
+        retArray1);
+
+    // result of MULTI_SERIES
+    String expectedHeader2 = "Device,count(s1),count(s2)";
+    String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,1,1", "root.sg.d3,0,0"};
+
+    resultSetEqualTest(
+        "select count(s1),count(s2) from root.** where s1 <= 4 order by time align by device",
+        expectedHeader2,
+        retArray2);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index a1ab1c18db..9b62d2faf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.driver;
 
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index ea369d8fad..e0a221d599 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.driver;
 
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index ff55d5456c..0c3a5c07e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 
 import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
index c02595374c..666e0709e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
@@ -20,9 +20,17 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
+import java.util.List;
+
 public interface IMPPDataExchangeManager {
   /**
    * Create a sink handle who sends data blocks to a remote downstream fragment instance in async
@@ -30,26 +38,17 @@ public interface IMPPDataExchangeManager {
    *
    * @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
    *     blocks to the sink handle.
-   * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
-   *     should be sent to.
-   * @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
-   * @param remotePlanNodeId The plan node ID of the local fragment instance.
    * @param instanceContext The context of local fragment instance.
    */
-  ISinkHandle createSinkHandle(
+  ISinkHandle createShuffleSinkHandle(
+      List<DownStreamChannelLocation> downStreamChannelLocationList,
+      DownStreamChannelIndex downStreamChannelIndex,
+      ShuffleSinkHandle.ShuffleStrategyEnum shuffleStrategyEnum,
       TFragmentInstanceId localFragmentInstanceId,
-      TEndPoint remoteEndpoint,
-      TFragmentInstanceId remoteFragmentInstanceId,
-      String remotePlanNodeId,
       String localPlanNodeId,
       FragmentInstanceContext instanceContext);
 
-  ISinkHandle createLocalSinkHandleForFragment(
-      TFragmentInstanceId localFragmentInstanceId,
-      TFragmentInstanceId remoteFragmentInstanceId,
-      String remotePlanNodeId,
-      FragmentInstanceContext instanceContext);
-
+  ISinkHandle createLocalSinkHandleForPipeline(DriverContext driverContext, String planNodeId);
   /**
    * Create a source handle who fetches data blocks from a remote upstream fragment instance for a
    * plan node of a local fragment instance in async manner.
@@ -65,6 +64,7 @@ public interface IMPPDataExchangeManager {
   ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback);
@@ -73,8 +73,12 @@ public interface IMPPDataExchangeManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       TFragmentInstanceId remoteFragmentInstanceId,
+      int index,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback);
 
+  /** SharedTsBlockQueue must belong to corresponding LocalSinkHandle */
+  ISourceHandle createLocalSourceHandleForPipeline(SharedTsBlockQueue queue, DriverContext context);
+
   /**
    * Release all the related resources of a fragment instance, including data blocks that are not
    * yet fetched by downstream fragment instances.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index f51c4b77cb..0b116a6cbc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -23,6 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.SinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -43,13 +52,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
 import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId;
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_SERVER;
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER;
@@ -60,25 +72,9 @@ import static org.apache.iotdb.db.mpp.metric.DataExchangeCountMetricSet.SEND_NEW
 
 public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
-  private static final Logger logger = LoggerFactory.getLogger(MPPDataExchangeManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MPPDataExchangeManager.class);
 
-  public interface SourceHandleListener {
-    void onFinished(ISourceHandle sourceHandle);
-
-    void onAborted(ISourceHandle sourceHandle);
-
-    void onFailure(ISourceHandle sourceHandle, Throwable t);
-  }
-
-  public interface SinkHandleListener {
-    void onFinish(ISinkHandle sinkHandle);
-
-    void onEndOfBlocks(ISinkHandle sinkHandle);
-
-    Optional<Throwable> onAborted(ISinkHandle sinkHandle);
-
-    void onFailure(ISinkHandle sinkHandle, Throwable t);
-  }
+  // region =========== MPPDataExchangeServiceImpl ===========
 
   /** Handle thrift communications. */
   class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
@@ -94,18 +90,23 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
                   req.sourceFragmentInstanceId.queryId,
                   req.sourceFragmentInstanceId.fragmentId,
                   req.sourceFragmentInstanceId.instanceId))) {
-        logger.debug(
+        LOGGER.debug(
             "[ProcessGetTsBlockRequest] sequence ID in [{}, {})",
             req.getStartSequenceId(),
             req.getEndSequenceId());
-        if (!sinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
+        if (!shuffleSinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
           throw new TException(
               "Source fragment instance not found. Fragment instance ID: "
                   + req.getSourceFragmentInstanceId()
                   + ".");
         }
         TGetDataBlockResponse resp = new TGetDataBlockResponse();
-        SinkHandle sinkHandle = (SinkHandle) sinkHandles.get(req.getSourceFragmentInstanceId());
+        // index of the channel must be a SinkHandle
+        SinkHandle sinkHandle =
+            (SinkHandle)
+                (shuffleSinkHandles
+                    .get(req.getSourceFragmentInstanceId())
+                    .getChannel(req.getIndex()));
         for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
           try {
             ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
@@ -132,21 +133,23 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
                   e.sourceFragmentInstanceId.queryId,
                   e.sourceFragmentInstanceId.fragmentId,
                   e.sourceFragmentInstanceId.instanceId))) {
-        logger.debug(
+        LOGGER.debug(
             "Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.",
             e.getStartSequenceId(),
             e.getEndSequenceId(),
             e.getSourceFragmentInstanceId());
-        if (!sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
-          logger.debug(
+        if (!shuffleSinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
+          LOGGER.debug(
               "received ACK event but target FragmentInstance[{}] is not found.",
               e.getSourceFragmentInstanceId());
           return;
         }
-        ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
+        // index of the channel must be a SinkHandle
+        ((SinkHandle)
+                (shuffleSinkHandles.get(e.getSourceFragmentInstanceId()).getChannel(e.getIndex())))
             .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
       } catch (Throwable t) {
-        logger.warn(
+        LOGGER.warn(
             "ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
         throw t;
       } finally {
@@ -162,7 +165,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       long startTime = System.nanoTime();
       try (SetThreadName fragmentInstanceName =
           new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
-        logger.debug(
+        LOGGER.debug(
             "New data block event received, for plan node {} of {} from {}.",
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
@@ -180,7 +183,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           // may
           // have already been stopped. For example, in the query whit LimitOperator, the downstream
           // FragmentInstance may be finished, although the upstream is still working.
-          logger.debug(
+          LOGGER.debug(
               "received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
@@ -198,7 +201,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException {
       try (SetThreadName fragmentInstanceName =
           new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
-        logger.debug(
+        LOGGER.debug(
             "End of data block event received, for plan node {} of {} from {}.",
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
@@ -212,7 +215,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
                 : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
 
         if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
-          logger.debug(
+          LOGGER.debug(
               "received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
@@ -223,6 +226,28 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     }
   }
 
+  // endregion
+
+  // region =========== listener ===========
+
+  public interface SourceHandleListener {
+    void onFinished(ISourceHandle sourceHandle);
+
+    void onAborted(ISourceHandle sourceHandle);
+
+    void onFailure(ISourceHandle sourceHandle, Throwable t);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(ISinkHandle sinkHandle);
+
+    void onEndOfBlocks(ISinkHandle sinkHandle);
+
+    Optional<Throwable> onAborted(ISinkHandle sinkHandle);
+
+    void onFailure(ISinkHandle sinkHandle, Throwable t);
+  }
+
   /** Listen to the state changes of a source handle. */
   class SourceHandleListenerImpl implements SourceHandleListener {
 
@@ -234,12 +259,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
-      logger.debug("[ScHListenerOnFinish]");
+      LOGGER.debug("[ScHListenerOnFinish]");
       Map<String, ISourceHandle> sourceHandleMap =
           sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
       if (sourceHandleMap == null
           || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == null) {
-        logger.debug("[ScHListenerAlreadyReleased]");
+        LOGGER.debug("[ScHListenerAlreadyReleased]");
       }
 
       if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
@@ -249,13 +274,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
     @Override
     public void onAborted(ISourceHandle sourceHandle) {
-      logger.debug("[ScHListenerOnAbort]");
+      LOGGER.debug("[ScHListenerOnAbort]");
       onFinished(sourceHandle);
     }
 
     @Override
     public void onFailure(ISourceHandle sourceHandle, Throwable t) {
-      logger.warn("Source handle failed due to: ", t);
+      LOGGER.warn("Source handle failed due to: ", t);
       if (onFailureCallback != null) {
         onFailureCallback.call(t);
       }
@@ -277,17 +302,17 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
-      logger.debug("[ScHListenerOnFinish]");
+      LOGGER.debug("[ScHListenerOnFinish]");
     }
 
     @Override
     public void onAborted(ISourceHandle sourceHandle) {
-      logger.debug("[ScHListenerOnAbort]");
+      LOGGER.debug("[ScHListenerOnAbort]");
     }
 
     @Override
     public void onFailure(ISourceHandle sourceHandle, Throwable t) {
-      logger.warn("Source handle failed due to: ", t);
+      LOGGER.warn("Source handle failed due to: ", t);
       if (onFailureCallback != null) {
         onFailureCallback.call(t);
       }
@@ -295,12 +320,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
   }
 
   /** Listen to the state changes of a sink handle. */
-  class SinkHandleListenerImpl implements SinkHandleListener {
+  class ShuffleSinkHandleListenerImpl implements SinkHandleListener {
 
     private final FragmentInstanceContext context;
     private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
 
-    public SinkHandleListenerImpl(
+    public ShuffleSinkHandleListenerImpl(
         FragmentInstanceContext context,
         IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
       this.context = context;
@@ -309,36 +334,28 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
     @Override
     public void onFinish(ISinkHandle sinkHandle) {
-      logger.debug("[SkHListenerOnFinish]");
-      removeFromMPPDataExchangeManager(sinkHandle);
+      LOGGER.debug("[ShuffleSinkHandleListenerOnFinish]");
+      shuffleSinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
       context.finished();
     }
 
     @Override
     public void onEndOfBlocks(ISinkHandle sinkHandle) {
-      logger.debug("[SkHListenerOnEndOfTsBlocks]");
+      LOGGER.debug("[ShuffleSinkHandleListenerOnEndOfTsBlocks]");
       context.transitionToFlushing();
     }
 
     @Override
     public Optional<Throwable> onAborted(ISinkHandle sinkHandle) {
-      logger.debug("[SkHListenerOnAbort]");
-      removeFromMPPDataExchangeManager(sinkHandle);
+      LOGGER.debug("[ShuffleSinkHandleListenerOnAbort]");
+      shuffleSinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
       return context.getFailureCause();
     }
 
-    private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
-      if (sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()) == null) {
-        logger.debug("[RemoveNoSinkHandle]");
-      } else {
-        logger.debug("[RemoveSinkHandle]");
-      }
-    }
-
     @Override
     public void onFailure(ISinkHandle sinkHandle, Throwable t) {
       // TODO: (xingtanzjr) should we remove the sinkHandle from MPPDataExchangeManager ?
-      logger.warn("Sink handle failed due to", t);
+      LOGGER.warn("Sink handle failed due to", t);
       if (onFailureCallback != null) {
         onFailureCallback.call(t);
       }
@@ -350,12 +367,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    * handle doesn't equal the finish of the whole fragment, therefore we don't need to notify
    * fragment context. But if it's aborted or failed, it can lead to the total fail.
    */
-  static class PipelineSinkHandleListenerImpl implements SinkHandleListener {
+  static class SinkHandleListenerImpl implements SinkHandleListener {
 
     private final FragmentInstanceContext context;
     private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
 
-    public PipelineSinkHandleListenerImpl(
+    public SinkHandleListenerImpl(
         FragmentInstanceContext context,
         IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
       this.context = context;
@@ -364,36 +381,42 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
     @Override
     public void onFinish(ISinkHandle sinkHandle) {
-      logger.debug("[SkHListenerOnFinish]");
+      LOGGER.debug("[SkHListenerOnFinish]");
     }
 
     @Override
     public void onEndOfBlocks(ISinkHandle sinkHandle) {
-      logger.debug("[SkHListenerOnEndOfTsBlocks]");
+      LOGGER.debug("[SkHListenerOnEndOfTsBlocks]");
     }
 
     @Override
     public Optional<Throwable> onAborted(ISinkHandle sinkHandle) {
-      logger.debug("[SkHListenerOnAbort]");
+      LOGGER.debug("[SkHListenerOnAbort]");
       return context.getFailureCause();
     }
 
     @Override
     public void onFailure(ISinkHandle sinkHandle, Throwable t) {
-      logger.warn("Sink handle failed due to", t);
+      LOGGER.warn("Sink handle failed due to", t);
       if (onFailureCallback != null) {
         onFailureCallback.call(t);
       }
     }
   }
 
+  // endregion
+
+  // region =========== MPPDataExchangeManager ===========
+
   private final LocalMemoryManager localMemoryManager;
   private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
   private final ExecutorService executorService;
   private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
       mppDataExchangeServiceClientManager;
   private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
-  private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
+
+  /** Each FI has only one ShuffleSinkHandle. */
+  private final Map<TFragmentInstanceId, ISinkHandle> shuffleSinkHandles;
 
   private MPPDataExchangeServiceImpl mppDataExchangeService;
 
@@ -409,7 +432,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     this.mppDataExchangeServiceClientManager =
         Validate.notNull(mppDataExchangeServiceClientManager);
     sourceHandles = new ConcurrentHashMap<>();
-    sinkHandles = new ConcurrentHashMap<>();
+    shuffleSinkHandles = new ConcurrentHashMap<>();
   }
 
   public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
@@ -419,7 +442,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     return mppDataExchangeService;
   }
 
-  @Override
   public synchronized ISinkHandle createLocalSinkHandleForFragment(
       TFragmentInstanceId localFragmentInstanceId,
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -427,12 +449,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       // TODO: replace with callbacks to decouple MPPDataExchangeManager from
       // FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
-    if (sinkHandles.containsKey(localFragmentInstanceId)) {
-      throw new IllegalStateException(
-          "Local sink handle for " + localFragmentInstanceId + " exists.");
-    }
 
-    logger.debug(
+    LOGGER.debug(
         "Create local sink handle to plan node {} of {} for {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
@@ -443,23 +461,20 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     LocalSourceHandle localSourceHandle =
         sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
     if (localSourceHandle != null) {
-      logger.debug("Get shared tsblock queue from local source handle");
+      LOGGER.debug("Get SharedTsBlockQueue from local source handle");
       queue =
           ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
               .getSharedTsBlockQueue();
     } else {
-      logger.debug("Create shared tsblock queue");
+      LOGGER.debug("Create SharedTsBlockQueue");
       queue =
           new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager);
     }
 
-    LocalSinkHandle localSinkHandle =
-        new LocalSinkHandle(
-            localFragmentInstanceId,
-            queue,
-            new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
-    sinkHandles.put(localFragmentInstanceId, localSinkHandle);
-    return localSinkHandle;
+    return new LocalSinkHandle(
+        localFragmentInstanceId,
+        queue,
+        new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
   }
 
   /**
@@ -468,7 +483,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    */
   public ISinkHandle createLocalSinkHandleForPipeline(
       DriverContext driverContext, String planNodeId) {
-    logger.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
+    LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
     SharedTsBlockQueue queue =
         new SharedTsBlockQueue(
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
@@ -476,11 +491,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             localMemoryManager);
     return new LocalSinkHandle(
         queue,
-        new PipelineSinkHandleListenerImpl(
+        new SinkHandleListenerImpl(
             driverContext.getFragmentInstanceContext(), driverContext::failed));
   }
 
-  @Override
   public ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
       TEndPoint remoteEndpoint,
@@ -490,30 +504,84 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       // TODO: replace with callbacks to decouple MPPDataExchangeManager from
       // FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
-    if (sinkHandles.containsKey(localFragmentInstanceId)) {
-      throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
-    }
 
-    logger.debug(
+    LOGGER.debug(
         "Create sink handle to plan node {} of {} for {}",
         remotePlanNodeId,
         remoteFragmentInstanceId,
         localFragmentInstanceId);
 
-    SinkHandle sinkHandle =
-        new SinkHandle(
-            remoteEndpoint,
-            remoteFragmentInstanceId,
-            remotePlanNodeId,
-            localPlanNodeId,
+    return new SinkHandle(
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        remotePlanNodeId,
+        localPlanNodeId,
+        localFragmentInstanceId,
+        localMemoryManager,
+        executorService,
+        tsBlockSerdeFactory.get(),
+        new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
+        mppDataExchangeServiceClientManager);
+  }
+
+  @Override
+  public ISinkHandle createShuffleSinkHandle(
+      List<DownStreamChannelLocation> downStreamChannelLocationList,
+      DownStreamChannelIndex downStreamChannelIndex,
+      ShuffleSinkHandle.ShuffleStrategyEnum shuffleStrategyEnum,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      // TODO: replace with callbacks to decouple MPPDataExchangeManager from
+      // FragmentInstanceContext
+      FragmentInstanceContext instanceContext) {
+    if (shuffleSinkHandles.containsKey(localFragmentInstanceId)) {
+      throw new IllegalStateException(
+          "ShuffleSinkHandle for " + localFragmentInstanceId + " exists.");
+    }
+
+    List<ISinkHandle> downStreamChannelList =
+        downStreamChannelLocationList.stream()
+            .map(
+                downStreamChannelLocation ->
+                    createHandleForShuffleSink(
+                        localFragmentInstanceId,
+                        localPlanNodeId,
+                        downStreamChannelLocation,
+                        instanceContext))
+            .collect(Collectors.toList());
+
+    ShuffleSinkHandle shuffleSinkHandle =
+        new ShuffleSinkHandle(
             localFragmentInstanceId,
-            localMemoryManager,
-            executorService,
-            tsBlockSerdeFactory.get(),
-            new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
-            mppDataExchangeServiceClientManager);
-    sinkHandles.put(localFragmentInstanceId, sinkHandle);
-    return sinkHandle;
+            downStreamChannelList,
+            downStreamChannelIndex,
+            shuffleStrategyEnum,
+            localPlanNodeId,
+            new ShuffleSinkHandleListenerImpl(instanceContext, instanceContext::failed));
+    shuffleSinkHandles.put(localFragmentInstanceId, shuffleSinkHandle);
+    return shuffleSinkHandle;
+  }
+
+  private ISinkHandle createHandleForShuffleSink(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      DownStreamChannelLocation downStreamChannelLocation,
+      FragmentInstanceContext instanceContext) {
+    if (isSameNode(downStreamChannelLocation.getRemoteEndpoint())) {
+      return createLocalSinkHandleForFragment(
+          localFragmentInstanceId,
+          downStreamChannelLocation.getRemoteFragmentInstanceId(),
+          downStreamChannelLocation.getRemotePlanNodeId(),
+          instanceContext);
+    } else {
+      return createSinkHandle(
+          localFragmentInstanceId,
+          downStreamChannelLocation.getRemoteEndpoint(),
+          downStreamChannelLocation.getRemoteFragmentInstanceId(),
+          downStreamChannelLocation.getRemotePlanNodeId(),
+          localPlanNodeId,
+          instanceContext);
+    }
   }
 
   /**
@@ -522,18 +590,18 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    */
   public ISourceHandle createLocalSourceHandleForPipeline(
       SharedTsBlockQueue queue, DriverContext context) {
-    logger.debug("Create local source handle for {}", context.getDriverTaskID());
+    LOGGER.debug("Create local source handle for {}", context.getDriverTaskID());
     return new LocalSourceHandle(
         queue,
         new PipelineSourceHandleListenerImpl(context::failed),
         context.getDriverTaskID().toString());
   }
 
-  @Override
   public synchronized ISourceHandle createLocalSourceHandleForFragment(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       TFragmentInstanceId remoteFragmentInstanceId,
+      int index,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
@@ -545,17 +613,19 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
               + " exists.");
     }
 
-    logger.debug(
+    LOGGER.debug(
         "Create local source handle from {} for plan node {} of {}",
         remoteFragmentInstanceId,
         localPlanNodeId,
         localFragmentInstanceId);
     SharedTsBlockQueue queue;
-    if (sinkHandles.containsKey(remoteFragmentInstanceId)) {
-      logger.debug("Get shared tsblock queue from local sink handle");
-      queue = ((LocalSinkHandle) sinkHandles.get(remoteFragmentInstanceId)).getSharedTsBlockQueue();
+    if (shuffleSinkHandles.containsKey(remoteFragmentInstanceId)) {
+      LOGGER.debug("Get SharedTsBlockQueue from local sink handle");
+      queue =
+          ((LocalSinkHandle) shuffleSinkHandles.get(remoteFragmentInstanceId).getChannel(index))
+              .getSharedTsBlockQueue();
     } else {
-      logger.debug("Create shared tsblock queue");
+      LOGGER.debug("Create SharedTsBlockQueue");
       queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, localMemoryManager);
     }
     LocalSourceHandle localSourceHandle =
@@ -574,6 +644,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
   public ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
@@ -587,7 +658,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
               + " exists.");
     }
 
-    logger.debug(
+    LOGGER.debug(
         "Create source handle from {} for plan node {} of {}",
         remoteFragmentInstanceId,
         localPlanNodeId,
@@ -599,6 +670,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
+            indexOfUpstreamSinkHandle,
             localMemoryManager,
             executorService,
             tsBlockSerdeFactory.get(),
@@ -617,21 +689,21 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    * <p>This method should be called when a fragment instance finished in an abnormal state.
    */
   public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) {
-    logger.debug("[StartForceReleaseFIDataExchangeResource]");
-    ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId);
+    LOGGER.debug("[StartForceReleaseFIDataExchangeResource]");
+    ISinkHandle sinkHandle = shuffleSinkHandles.get(fragmentInstanceId);
     if (sinkHandle != null) {
       sinkHandle.abort();
-      sinkHandles.remove(fragmentInstanceId);
+      shuffleSinkHandles.remove(fragmentInstanceId);
     }
     Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
     if (planNodeIdToSourceHandle != null) {
       for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
-        logger.debug("[CloseSourceHandle] {}", entry.getKey());
+        LOGGER.debug("[CloseSourceHandle] {}", entry.getKey());
         entry.getValue().abort();
       }
       sourceHandles.remove(fragmentInstanceId);
     }
-    logger.debug("[EndForceReleaseFIDataExchangeResource]");
+    LOGGER.debug("[EndForceReleaseFIDataExchangeResource]");
   }
 
   /** @param suffix should be like [PlanNodeId].SourceHandle/SinHandle */
@@ -643,4 +715,5 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         + "."
         + suffix;
   }
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index a40202de46..46048fdb96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -122,6 +124,10 @@ public class SharedTsBlockQueue {
     return queue.isEmpty();
   }
 
+  public int getNumOfBufferedTsBlocks() {
+    return queue.size();
+  }
+
   public void setSinkHandle(LocalSinkHandle sinkHandle) {
     this.sinkHandle = sinkHandle;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelIndex.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelIndex.java
new file mode 100644
index 0000000000..cd36b18ae6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelIndex.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+public class DownStreamChannelIndex {
+  /** CurrentIndex of downstream ISourceHandle */
+  private int currentIndex;
+
+  public DownStreamChannelIndex(int currentIndex) {
+    this.currentIndex = currentIndex;
+  }
+
+  public int getCurrentIndex() {
+    return currentIndex;
+  }
+
+  public void setCurrentIndex(int currentIndex) {
+    this.currentIndex = currentIndex;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelLocation.java
new file mode 100644
index 0000000000..83a74b0d55
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelLocation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DownStreamChannelLocation {
+
+  private final TEndPoint remoteEndpoint;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remotePlanNodeId;
+
+  /**
+   * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
+   *     should be sent to.
+   * @param remoteFragmentInstanceId The ID of the remote fragment instance.
+   * @param remotePlanNodeId The plan node ID of the remote exchangeNode.
+   */
+  public DownStreamChannelLocation(
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId) {
+    this.remoteEndpoint = remoteEndpoint;
+    this.remoteFragmentInstanceId = remoteFragmentInstanceId;
+    this.remotePlanNodeId = remotePlanNodeId;
+  }
+
+  public TEndPoint getRemoteEndpoint() {
+    return remoteEndpoint;
+  }
+
+  public TFragmentInstanceId getRemoteFragmentInstanceId() {
+    return remoteFragmentInstanceId;
+  }
+
+  public String getRemotePlanNodeId() {
+    return remotePlanNodeId;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(remoteEndpoint.getIp(), byteBuffer);
+    ReadWriteIOUtils.write(remoteEndpoint.getPort(), byteBuffer);
+    ReadWriteIOUtils.write(remoteFragmentInstanceId.getInstanceId(), byteBuffer);
+    ReadWriteIOUtils.write(remoteFragmentInstanceId.getFragmentId(), byteBuffer);
+    ReadWriteIOUtils.write(remoteFragmentInstanceId.getQueryId(), byteBuffer);
+    ReadWriteIOUtils.write(remotePlanNodeId, byteBuffer);
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(remoteEndpoint.getIp(), stream);
+    ReadWriteIOUtils.write(remoteEndpoint.getPort(), stream);
+    ReadWriteIOUtils.write(remoteFragmentInstanceId.getInstanceId(), stream);
+    ReadWriteIOUtils.write(remoteFragmentInstanceId.getFragmentId(), stream);
+    ReadWriteIOUtils.write(remoteFragmentInstanceId.getQueryId(), stream);
+    ReadWriteIOUtils.write(remotePlanNodeId, stream);
+  }
+
+  public static DownStreamChannelLocation deserialize(ByteBuffer byteBuffer) {
+    TEndPoint endPoint =
+        new TEndPoint(
+            ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
+    TFragmentInstanceId fragmentInstanceId =
+        new TFragmentInstanceId(
+            ReadWriteIOUtils.readString(byteBuffer),
+            ReadWriteIOUtils.readInt(byteBuffer),
+            ReadWriteIOUtils.readString(byteBuffer));
+    String remotePlanNodeId = ReadWriteIOUtils.readString(byteBuffer);
+    return new DownStreamChannelLocation(endPoint, fragmentInstanceId, remotePlanNodeId);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkChannel.java
new file mode 100644
index 0000000000..2072136cf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkChannel.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+public interface ISinkChannel {
+
+  void open();
+
+  boolean isNoMoreTsBlocks();
+
+  long getRetainedSizeInBytes();
+
+  int getNumOfBufferedTsBlocks();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
similarity index 66%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
index 4bceda0e15..1a3fc2ae52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
@@ -16,57 +16,67 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
 
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.util.List;
-
 public interface ISinkHandle {
 
   /** Get the local fragment instance ID that this sink handle belongs to. */
   TFragmentInstanceId getLocalFragmentInstanceId();
 
-  /** Get the total amount of memory used by buffered tsblocks. */
+  /** Get the total amount of memory used by buffered TsBlocks. */
   long getBufferRetainedSizeInBytes();
 
+  default ISinkHandle getChannel(int index) {
+    throw new UnsupportedOperationException();
+  }
+
   /** Get a future that will be completed when the output buffer is not full. */
   ListenableFuture<?> isFull();
 
   /**
-   * Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set,
+   * Send a {@link TsBlock} to an un-partitioned output buffer. If no-more-TsBlocks has been set,
    * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
    * will be thrown if any exception happened during the data transmission.
    */
   void send(TsBlock tsBlock);
 
   /**
-   * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
-   * tsblock call is ignored. This can happen with limit queries. A {@link RuntimeException} will be
-   * thrown if any exception happened * during the data transmission.
+   * Notify the handle that there are no more TsBlocks. Any future calls to send a TsBlock should be
+   * ignored.
    */
-  void send(int partition, List<TsBlock> tsBlocks);
+  void setNoMoreTsBlocks();
 
   /**
-   * Notify the handle that there are no more tsblocks. Any future calls to send a tsblock should be
-   * ignored.
+   * Notify the handle that there are no more TsBlocks for the specified channel. Any future calls
+   * to send a TsBlock to the specified channel should be ignored.
+   *
+   * @param channelIndex index of the channel that should be closed
    */
-  void setNoMoreTsBlocks();
+  default void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
+    throw new UnsupportedOperationException();
+  }
+
+  /** Open specified channel of ISinkHandle. */
+  default void tryOpenChannel(int channelIndex) {
+    throw new UnsupportedOperationException();
+  }
 
   /** If the handle is aborted. */
   boolean isAborted();
 
   /**
-   * If there are no more tsblocks to be sent and all the tsblocks have been fetched by downstream
+   * If there are no more TsBlocks to be sent and all the TsBlocks have been fetched by downstream
    * fragment instances.
    */
   boolean isFinished();
 
   /**
-   * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+   * Abort the sink handle. Discard all TsBlocks which may still be in the memory buffer and cancel
    * the future returned by {@link #isFull()}.
    *
    * <p>Should only be called in abnormal case
@@ -74,7 +84,7 @@ public interface ISinkHandle {
   void abort();
 
   /**
-   * Close the sink handle. Discard all tsblocks which may still be in the memory buffer and
+   * Close the sink handle. Discard all TsBlocks which may still be in the memory buffer and
    * complete the future returned by {@link #isFull()}.
    *
    * <p>Should only be called in normal case.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkHandle.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkHandle.java
index 8afb07fe6e..48b617a1dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkHandle.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
 
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -29,15 +30,14 @@ import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Optional;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
 
-public class LocalSinkHandle implements ISinkHandle {
+public class LocalSinkHandle implements ISinkHandle, ISinkChannel {
 
-  private static final Logger logger = LoggerFactory.getLogger(LocalSinkHandle.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(LocalSinkHandle.class);
 
   private TFragmentInstanceId localFragmentInstanceId;
   private final SinkHandleListener sinkHandleListener;
@@ -47,6 +47,8 @@ public class LocalSinkHandle implements ISinkHandle {
   private boolean aborted = false;
   private boolean closed = false;
 
+  private boolean noMoreTsBlocks = false;
+
   private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
   public LocalSinkHandle(SharedTsBlockQueue queue, SinkHandleListener sinkHandleListener) {
@@ -122,7 +124,7 @@ public class LocalSinkHandle implements ISinkHandle {
         if (queue.hasNoMoreTsBlocks()) {
           return;
         }
-        logger.debug("[StartSendTsBlockOnLocal]");
+        LOGGER.debug("[StartSendTsBlockOnLocal]");
         synchronized (this) {
           blocked = queue.add(tsBlock);
         }
@@ -133,16 +135,12 @@ public class LocalSinkHandle implements ISinkHandle {
     }
   }
 
-  @Override
-  public synchronized void send(int partition, List<TsBlock> tsBlocks) {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public void setNoMoreTsBlocks() {
     synchronized (queue) {
       synchronized (this) {
-        logger.debug("[StartSetNoMoreTsBlocksOnLocal]");
+        LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
+        noMoreTsBlocks = true;
         if (aborted || closed) {
           return;
         }
@@ -151,12 +149,12 @@ public class LocalSinkHandle implements ISinkHandle {
       }
     }
     checkAndInvokeOnFinished();
-    logger.debug("[EndSetNoMoreTsBlocksOnLocal]");
+    LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
   }
 
   @Override
   public void abort() {
-    logger.debug("[StartAbortLocalSinkHandle]");
+    LOGGER.debug("[StartAbortLocalSinkHandle]");
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
@@ -171,12 +169,12 @@ public class LocalSinkHandle implements ISinkHandle {
         }
       }
     }
-    logger.debug("[EndAbortLocalSinkHandle]");
+    LOGGER.debug("[EndAbortLocalSinkHandle]");
   }
 
   @Override
   public void close() {
-    logger.debug("[StartCloseLocalSinkHandle]");
+    LOGGER.debug("[StartCloseLocalSinkHandle]");
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
@@ -187,7 +185,7 @@ public class LocalSinkHandle implements ISinkHandle {
         sinkHandleListener.onFinish(this);
       }
     }
-    logger.debug("[EndCloseLocalSinkHandle]");
+    LOGGER.debug("[EndCloseLocalSinkHandle]");
   }
 
   public SharedTsBlockQueue getSharedTsBlockQueue() {
@@ -207,4 +205,26 @@ public class LocalSinkHandle implements ISinkHandle {
     // do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
     // LocalSourceHandle
   }
+
+  // region ============ ISinkChannel related ============
+
+  @Override
+  public void open() {}
+
+  @Override
+  public boolean isNoMoreTsBlocks() {
+    return noMoreTsBlocks;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return queue.getBufferRetainedSizeInBytes();
+  }
+
+  @Override
+  public int getNumOfBufferedTsBlocks() {
+    return queue.getNumOfBufferedTsBlocks();
+  }
+
+  // end region
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
new file mode 100644
index 0000000000..4a4b7cbde4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
+
+public class ShuffleSinkHandle implements ISinkHandle {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleSinkHandle.class);
+
+  /** Each ISinkHandle in the list matches one downStream ISourceHandle */
+  private final List<ISinkHandle> downStreamChannelList;
+
+  private final boolean[] hasSetNoMoreTsBlocks;
+
+  private final boolean[] channelOpened;
+
+  private final DownStreamChannelIndex downStreamChannelIndex;
+
+  private final int channelNum;
+
+  private final ShuffleStrategy shuffleStrategy;
+
+  private final String localPlanNodeId;
+
+  private final TFragmentInstanceId localFragmentInstanceId;
+
+  private final MPPDataExchangeManager.SinkHandleListener sinkHandleListener;
+
+  private boolean aborted = false;
+
+  private boolean closed = false;
+
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
+  /** max bytes this ShuffleSinkHandle can reserve. */
+  private long maxBytesCanReserve =
+      IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+
+  public ShuffleSinkHandle(
+      TFragmentInstanceId localFragmentInstanceId,
+      List<ISinkHandle> downStreamChannelList,
+      DownStreamChannelIndex downStreamChannelIndex,
+      ShuffleStrategyEnum shuffleStrategyEnum,
+      String localPlanNodeId,
+      MPPDataExchangeManager.SinkHandleListener sinkHandleListener) {
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.downStreamChannelList = Validate.notNull(downStreamChannelList);
+    this.downStreamChannelIndex = Validate.notNull(downStreamChannelIndex);
+    this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
+    this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+    this.channelNum = downStreamChannelList.size();
+    this.hasSetNoMoreTsBlocks = new boolean[channelNum];
+    this.channelOpened = new boolean[channelNum];
+    // open first channel
+    tryOpenChannel(0);
+  }
+
+  @Override
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
+    return localFragmentInstanceId;
+  }
+
+  public ISinkHandle getChannel(int index) {
+    return downStreamChannelList.get(index);
+  }
+
+  @Override
+  public synchronized ListenableFuture<?> isFull() {
+    // It is safe to use currentSinkHandle.isFull() to judge whether we can send a TsBlock only when
+    // downStreamChannelIndex will not be changed between we call isFull() and send() of
+    // ShuffleSinkHandle
+    ISinkHandle currentSinkHandle =
+        downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
+    return currentSinkHandle.isFull();
+  }
+
+  @Override
+  public synchronized void send(TsBlock tsBlock) {
+    long startTime = System.nanoTime();
+    try {
+      ISinkHandle currentSinkHandle =
+          downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
+      checkState();
+      currentSinkHandle.send(tsBlock);
+    } finally {
+      switchChannelIfNecessary();
+      QUERY_METRICS.recordDataExchangeCost(
+          SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
+    }
+  }
+
+  @Override
+  public synchronized void setNoMoreTsBlocks() {
+    for (int i = 0; i < downStreamChannelList.size(); i++) {
+      if (!hasSetNoMoreTsBlocks[i]) {
+        downStreamChannelList.get(i).setNoMoreTsBlocks();
+        hasSetNoMoreTsBlocks[i] = true;
+      }
+    }
+    sinkHandleListener.onEndOfBlocks(this);
+  }
+
+  @Override
+  public synchronized void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
+    if (!hasSetNoMoreTsBlocks[channelIndex]) {
+      downStreamChannelList.get(channelIndex).setNoMoreTsBlocks();
+      hasSetNoMoreTsBlocks[channelIndex] = true;
+    }
+  }
+
+  @Override
+  public synchronized boolean isAborted() {
+    return aborted;
+  }
+
+  @Override
+  public synchronized boolean isFinished() {
+    for (ISinkHandle channel : downStreamChannelList) {
+      if (!channel.isFinished()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void abort() {
+    if (aborted) {
+      return;
+    }
+    LOGGER.debug("[StartAbortShuffleSinkHandle]");
+    for (ISinkHandle channel : downStreamChannelList) {
+      try {
+        channel.abort();
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred when try to abort channel.");
+      }
+    }
+    aborted = true;
+    sinkHandleListener.onAborted(this);
+    LOGGER.debug("[EndAbortShuffleSinkHandle]");
+  }
+
+  @Override
+  public synchronized void close() {
+    if (closed) {
+      return;
+    }
+    LOGGER.debug("[StartCloseShuffleSinkHandle]");
+    for (ISinkHandle channel : downStreamChannelList) {
+      try {
+        channel.close();
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred when try to abort channel.");
+      }
+    }
+    closed = true;
+    sinkHandleListener.onFinish(this);
+    LOGGER.debug("[EndCloseShuffleSinkHandle]");
+  }
+
+  @Override
+  public void setMaxBytesCanReserve(long maxBytesCanReserve) {
+    this.maxBytesCanReserve = maxBytesCanReserve;
+    downStreamChannelList.forEach(
+        sinkHandle -> sinkHandle.setMaxBytesCanReserve(maxBytesCanReserve));
+  }
+
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("ShuffleSinkHandle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("ShuffleSinkHandle is closed.");
+    }
+  }
+
+  private void switchChannelIfNecessary() {
+    shuffleStrategy.shuffle();
+    tryOpenChannel(downStreamChannelIndex.getCurrentIndex());
+  }
+
+  public void tryOpenChannel(int channelIndex) {
+    if (!channelOpened[channelIndex]) {
+      ((ISinkChannel) downStreamChannelList.get(channelIndex)).open();
+      channelOpened[channelIndex] = true;
+    }
+  }
+
+  // region ============ Shuffle Related ============
+  public enum ShuffleStrategyEnum {
+    PLAIN,
+    SIMPLE_ROUND_ROBIN,
+  }
+
+  @FunctionalInterface
+  interface ShuffleStrategy {
+    /*
+     SinkHandle may have multiple channels, we need to choose the next channel each time we send a TsBlock.
+    */
+    void shuffle();
+  }
+
+  class PlainShuffleStrategy implements ShuffleStrategy {
+
+    @Override
+    public void shuffle() {
+      // do nothing
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "PlainShuffleStrategy needs to do nothing, current channel index is {}",
+            downStreamChannelIndex.getCurrentIndex());
+      }
+    }
+  }
+
+  class SimpleRoundRobinStrategy implements ShuffleStrategy {
+
+    private final long channelMemoryThreshold = maxBytesCanReserve / channelNum * 3;
+
+    @Override
+    public void shuffle() {
+      int currentIndex = downStreamChannelIndex.getCurrentIndex();
+      for (int i = 1; i < channelNum; i++) {
+        int nextIndex = (currentIndex + i) % channelNum;
+        if (satisfy(nextIndex)) {
+          downStreamChannelIndex.setCurrentIndex(nextIndex);
+          return;
+        }
+      }
+    }
+
+    private boolean satisfy(int channelIndex) {
+      // downStreamChannel is always an ISinkChannel
+      ISinkChannel channel = (ISinkChannel) downStreamChannelList.get(channelIndex);
+      if (channel.isNoMoreTsBlocks()) {
+        return false;
+      }
+      return channel.getRetainedSizeInBytes() <= channelMemoryThreshold
+          && channel.getNumOfBufferedTsBlocks() < 3;
+    }
+  }
+
+  private ShuffleStrategy getShuffleStrategy(ShuffleStrategyEnum strategyEnum) {
+    switch (strategyEnum) {
+      case PLAIN:
+        return new PlainShuffleStrategy();
+      case SIMPLE_ROUND_ROBIN:
+        return new SimpleRoundRobinStrategy();
+      default:
+        throw new UnsupportedOperationException("Unsupported type of shuffle strategy");
+    }
+  }
+
+  // endregion
+
+  // region ============= Test Only =============
+  @TestOnly
+  @Override
+  public long getBufferRetainedSizeInBytes() {
+    return downStreamChannelList.stream()
+        .map(ISinkHandle::getBufferRetainedSizeInBytes)
+        .reduce(Long::sum)
+        .orElse(0L);
+  }
+  // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkHandle.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkHandle.java
index fbc36ed34d..ff3ca62043 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkHandle.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -57,9 +57,9 @@ import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HAND
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_CALLER;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
-public class SinkHandle implements ISinkHandle {
+public class SinkHandle implements ISinkHandle, ISinkChannel {
 
-  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SinkHandle.class);
 
   public static final int MAX_ATTEMPT_TIMES = 3;
   private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000L;
@@ -80,7 +80,7 @@ public class SinkHandle implements ISinkHandle {
   private long retryIntervalInMs;
 
   // Use LinkedHashMap to meet 2 needs,
-  //   1. Predictable iteration order so that removing buffered tsblocks can be efficient.
+  //   1. Predictable iteration order so that removing buffered TsBlocks can be efficient.
   //   2. Fast lookup.
   private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock =
       new LinkedHashMap<>();
@@ -102,7 +102,7 @@ public class SinkHandle implements ISinkHandle {
 
   private boolean noMoreTsBlocks = false;
 
-  /** max bytes this SourceHandle can reserve. */
+  /** max bytes this SinkHandle can reserve. */
   private long maxBytesCanReserve =
       IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
@@ -138,20 +138,11 @@ public class SinkHandle implements ISinkHandle {
             localFragmentInstanceId.queryId,
             localFragmentInstanceId.fragmentId,
             localFragmentInstanceId.instanceId);
-    this.blocked =
-        localMemoryManager
-            .getQueryPool()
-            .reserve(
-                localFragmentInstanceId.getQueryId(),
-                fullFragmentInstanceId,
-                localPlanNodeId,
-                DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
-                DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
-            // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
-            // at first this SinkHandle has not reserved memory.
-            .left;
     this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
     this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+    // delete this once ShuffleSinkHandle is finished
+    open();
   }
 
   @Override
@@ -203,14 +194,9 @@ public class SinkHandle implements ISinkHandle {
     }
   }
 
-  @Override
-  public synchronized void send(int partition, List<TsBlock> tsBlocks) {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public synchronized void setNoMoreTsBlocks() {
-    logger.debug("[StartSetNoMoreTsBlocks]");
+    LOGGER.debug("[StartSetNoMoreTsBlocks]");
     if (aborted || closed) {
       return;
     }
@@ -219,7 +205,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void abort() {
-    logger.debug("[StartAbortSinkHandle]");
+    LOGGER.debug("[StartAbortSinkHandle]");
     sequenceIdToTsBlock.clear();
     aborted = true;
     bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
@@ -238,12 +224,12 @@ public class SinkHandle implements ISinkHandle {
         .clearMemoryReservationMap(
             localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onAborted(this);
-    logger.debug("[EndAbortSinkHandle]");
+    LOGGER.debug("[EndAbortSinkHandle]");
   }
 
   @Override
   public synchronized void close() {
-    logger.debug("[StartCloseSinkHandle]");
+    LOGGER.debug("[StartCloseSinkHandle]");
     sequenceIdToTsBlock.clear();
     closed = true;
     bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
@@ -262,7 +248,7 @@ public class SinkHandle implements ISinkHandle {
         .clearMemoryReservationMap(
             localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onFinish(this);
-    logger.debug("[EndCloseSinkHandle]");
+    LOGGER.debug("[EndCloseSinkHandle]");
   }
 
   @Override
@@ -280,17 +266,13 @@ public class SinkHandle implements ISinkHandle {
     return bufferRetainedSizeInBytes;
   }
 
-  public int getNumOfBufferedTsBlocks() {
-    return sequenceIdToTsBlock.size();
-  }
-
-  ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
+  public ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
     throw new UnsupportedOperationException();
   }
 
-  synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
+  public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
     if (aborted || closed) {
-      logger.warn(
+      LOGGER.warn(
           "SinkHandle still receive getting TsBlock request after being aborted={} or closed={}",
           aborted,
           closed);
@@ -298,7 +280,7 @@ public class SinkHandle implements ISinkHandle {
     }
     Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
     if (pair == null || pair.left == null) {
-      logger.error(
+      LOGGER.warn(
           "The TsBlock doesn't exist. Sequence ID is {}, remaining map is {}",
           sequenceId,
           sequenceIdToTsBlock.entrySet());
@@ -307,7 +289,7 @@ public class SinkHandle implements ISinkHandle {
     return serde.serialize(pair.left);
   }
 
-  void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
+  public void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
     long freedBytes = 0L;
     synchronized (this) {
       if (aborted || closed) {
@@ -327,7 +309,7 @@ public class SinkHandle implements ISinkHandle {
         freedBytes += entry.getValue().right;
         bufferRetainedSizeInBytes -= entry.getValue().right;
         iterator.remove();
-        logger.debug("[ACKTsBlock] {}.", entry.getKey());
+        LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
       }
     }
     if (isFinished()) {
@@ -346,18 +328,7 @@ public class SinkHandle implements ISinkHandle {
     }
   }
 
-  public TEndPoint getRemoteEndpoint() {
-    return remoteEndpoint;
-  }
-
-  public TFragmentInstanceId getRemoteFragmentInstanceId() {
-    return remoteFragmentInstanceId;
-  }
-
-  public String getRemotePlanNodeId() {
-    return remotePlanNodeId;
-  }
-
+  @Override
   public TFragmentInstanceId getLocalFragmentInstanceId() {
     return localFragmentInstanceId;
   }
@@ -384,11 +355,49 @@ public class SinkHandle implements ISinkHandle {
     }
   }
 
+  // region ============ ISinkChannel related ============
+
+  public void open() {
+    // SinkHandle is opened when ShuffleSinkHandle choose it as the next channel
+    this.blocked =
+        localMemoryManager
+            .getQueryPool()
+            .reserve(
+                localFragmentInstanceId.getQueryId(),
+                fullFragmentInstanceId,
+                localPlanNodeId,
+                DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+                DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
+            // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
+            // at first this SinkHandle has not reserved memory.
+            .left;
+  }
+
+  @Override
+  public boolean isNoMoreTsBlocks() {
+    return noMoreTsBlocks;
+  }
+
+  @Override
+  public long getRetainedSizeInBytes() {
+    return bufferRetainedSizeInBytes;
+  }
+
+  @Override
+  public int getNumOfBufferedTsBlocks() {
+    return sequenceIdToTsBlock.size();
+  }
+
+  // endregion
+
+  // region ============ TestOnly ============
   @TestOnly
   public void setRetryIntervalInMs(long retryIntervalInMs) {
     this.retryIntervalInMs = retryIntervalInMs;
   }
+  // endregion
 
+  // region ============ inner class ============
   /**
    * Send a {@link org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent} to downstream fragment
    * instance.
@@ -411,7 +420,7 @@ public class SinkHandle implements ISinkHandle {
     @Override
     public void run() {
       try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
-        logger.debug(
+        LOGGER.debug(
             "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size());
         int attempt = 0;
         TNewDataBlockEvent newDataBlockEvent =
@@ -429,7 +438,7 @@ public class SinkHandle implements ISinkHandle {
             client.onNewDataBlockEvent(newDataBlockEvent);
             break;
           } catch (Exception e) {
-            logger.warn("Failed to send new data block event, attempt times: {}", attempt, e);
+            LOGGER.warn("Failed to send new data block event, attempt times: {}", attempt, e);
             if (attempt == MAX_ATTEMPT_TIMES) {
               sinkHandleListener.onFailure(SinkHandle.this, e);
             }
@@ -458,7 +467,7 @@ public class SinkHandle implements ISinkHandle {
     @Override
     public void run() {
       try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
-        logger.debug("[NotifyNoMoreTsBlock]");
+        LOGGER.debug("[NotifyNoMoreTsBlock]");
         int attempt = 0;
         TEndOfDataBlockEvent endOfDataBlockEvent =
             new TEndOfDataBlockEvent(
@@ -473,9 +482,9 @@ public class SinkHandle implements ISinkHandle {
             client.onEndOfDataBlockEvent(endOfDataBlockEvent);
             break;
           } catch (Exception e) {
-            logger.warn("Failed to send end of data block event, attempt times: {}", attempt, e);
+            LOGGER.warn("Failed to send end of data block event, attempt times: {}", attempt, e);
             if (attempt == MAX_ATTEMPT_TIMES) {
-              logger.warn("Failed to send end of data block event after all retry", e);
+              LOGGER.warn("Failed to send end of data block event after all retry", e);
               sinkHandleListener.onFailure(SinkHandle.this, e);
               return;
             }
@@ -495,4 +504,6 @@ public class SinkHandle implements ISinkHandle {
       }
     }
   }
+  // endregion
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
index d056717060..14ac3429e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.source;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index f47dea04d6..fd639ce0e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -17,10 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.source;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 386fdd40b8..d03da2b10a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.source;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -72,6 +72,8 @@ public class SourceHandle implements ISourceHandle {
 
   private final String fullFragmentInstanceId;
   private final String localPlanNodeId;
+
+  private final int indexOfUpstreamSinkHandle;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
   private final TsBlockSerde serde;
@@ -117,6 +119,7 @@ public class SourceHandle implements ISourceHandle {
       TFragmentInstanceId remoteFragmentInstanceId,
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
       TsBlockSerde serde,
@@ -129,6 +132,7 @@ public class SourceHandle implements ISourceHandle {
     this.fullFragmentInstanceId =
         FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+    this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
     this.serde = Validate.notNull(serde);
@@ -278,7 +282,7 @@ public class SourceHandle implements ISourceHandle {
     return nonCancellationPropagating(blocked);
   }
 
-  synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+  public synchronized void setNoMoreTsBlocks(int lastSequenceId) {
     logger.debug("[ReceiveNoMoreTsBlockEvent]");
     this.lastSequenceId = lastSequenceId;
     if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
@@ -289,7 +293,8 @@ public class SourceHandle implements ISourceHandle {
     }
   }
 
-  synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
+  public synchronized void updatePendingDataBlockInfo(
+      int startSequenceId, List<Long> dataBlockSizes) {
     logger.debug(
         "[ReceiveNewTsBlockNotification] [{}, {}), each size is: {}",
         startSequenceId,
@@ -467,7 +472,11 @@ public class SourceHandle implements ISourceHandle {
       try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
         logger.debug("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId);
         TGetDataBlockRequest req =
-            new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
+            new TGetDataBlockRequest(
+                remoteFragmentInstanceId,
+                startSequenceId,
+                endSequenceId,
+                indexOfUpstreamSinkHandle);
         int attempt = 0;
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
@@ -565,7 +574,10 @@ public class SourceHandle implements ISourceHandle {
         int attempt = 0;
         TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent =
             new TAcknowledgeDataBlockEvent(
-                remoteFragmentInstanceId, startSequenceId, endSequenceId);
+                remoteFragmentInstanceId,
+                startSequenceId,
+                endSequenceId,
+                indexOfUpstreamSinkHandle);
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
           long startTime = System.nanoTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 14ada6d7c0..8c76bf989b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.utils.SetThreadName;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 794174b939..1e4f529a80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
new file mode 100644
index 0000000000..01ecb7f71b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class IdentitySinkOperator implements Operator {
+
+  private final OperatorContext operatorContext;
+  private final List<Operator> children;
+
+  private final DownStreamChannelIndex downStreamChannelIndex;
+
+  private final ISinkHandle sinkHandle;
+
+  private boolean needToReturnNull = false;
+
+  private boolean isFinished = false;
+
+  public IdentitySinkOperator(
+      OperatorContext operatorContext,
+      List<Operator> children,
+      DownStreamChannelIndex downStreamChannelIndex,
+      ISinkHandle sinkHandle) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.downStreamChannelIndex = downStreamChannelIndex;
+    this.sinkHandle = sinkHandle;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (children.get(downStreamChannelIndex.getCurrentIndex()).hasNext()) {
+      return true;
+    }
+    int currentIndex = downStreamChannelIndex.getCurrentIndex();
+    // current channel have no more data
+    sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
+    currentIndex++;
+    if (currentIndex >= children.size()) {
+      isFinished = true;
+      return false;
+    }
+    downStreamChannelIndex.setCurrentIndex(currentIndex);
+    // if we reach here, it means that isBlocked() is called on a different child
+    // we need to ensure that this child is not blocked. We set this field to true here so that we
+    // can begin another loop in Driver.
+    needToReturnNull = true;
+    // tryOpenChannel first
+    sinkHandle.tryOpenChannel(currentIndex);
+    return true;
+  }
+
+  @Override
+  public TsBlock next() {
+    if (needToReturnNull) {
+      needToReturnNull = false;
+      return null;
+    }
+    return children.get(downStreamChannelIndex.getCurrentIndex()).next();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return children.get(downStreamChannelIndex.getCurrentIndex()).isBlocked();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return isFinished;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+    for (Operator child : children) {
+      maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
new file mode 100644
index 0000000000..7e643ca576
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ShuffleHelperOperator implements Operator {
+  private final OperatorContext operatorContext;
+  private final List<Operator> children;
+
+  private final DownStreamChannelIndex downStreamChannelIndex;
+
+  private final ISinkHandle sinkHandle;
+
+  private final Set<Integer> unfinishedChildren;
+
+  private boolean needToReturnNull = false;
+
+  public ShuffleHelperOperator(
+      OperatorContext operatorContext,
+      List<Operator> children,
+      DownStreamChannelIndex downStreamChannelIndex,
+      ISinkHandle sinkHandle) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.downStreamChannelIndex = downStreamChannelIndex;
+    this.sinkHandle = sinkHandle;
+    this.unfinishedChildren = new HashSet<>(children.size());
+    for (int i = 0; i < children.size(); i++) {
+      unfinishedChildren.add(i);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (children.get(downStreamChannelIndex.getCurrentIndex()).hasNext()) {
+      return true;
+    }
+    int currentIndex = downStreamChannelIndex.getCurrentIndex();
+    // current channel have no more data
+    sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
+    unfinishedChildren.remove(currentIndex);
+    currentIndex = (currentIndex + 1) % children.size();
+    downStreamChannelIndex.setCurrentIndex(currentIndex);
+    // if we reach here, it means that isBlocked() is called on a different child
+    // we need to ensure that this child is not blocked. We set this field to true here so that we
+    // can begin another loop in Driver.
+    needToReturnNull = true;
+    // tryOpenChannel first
+    sinkHandle.tryOpenChannel(currentIndex);
+    return true;
+  }
+
+  @Override
+  public TsBlock next() {
+    if (needToReturnNull) {
+      needToReturnNull = false;
+      return null;
+    }
+    return children.get(downStreamChannelIndex.getCurrentIndex()).next();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return children.get(downStreamChannelIndex.getCurrentIndex()).isBlocked();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return unfinishedChildren.isEmpty();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+    for (Operator child : children) {
+      maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 7e7498fbc1..ba57e1ff07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index c436b180f8..86ad87c780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
 import org.apache.iotdb.db.mpp.execution.schedule.ExecutionContext;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.ID;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index ae8722f1d7..50278149c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.mpp.metric.PerformanceOverviewMetricsManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -563,12 +563,14 @@ public class QueryExecution implements IQueryExecution {
                     context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                     context.getResultNodeContext().getVirtualResultNodeId().getId(),
                     context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+                    0, // Upstream of result ExchangeNode will only have one child.
                     stateMachine::transitionToFailed)
             : MPPDataExchangeService.getInstance()
                 .getMPPDataExchangeManager()
                 .createSourceHandle(
                     context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                     context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                    0,
                     upstreamEndPoint,
                     context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
                     stateMachine::transitionToFailed);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index e2b60ef441..a071b4b83c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.execution.memory;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 0404be866a..f6e808e2f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 49706b4ebe..74d96ccc9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -33,11 +33,13 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.NodeRef;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
-import org.apache.iotdb.db.mpp.execution.exchange.LocalSinkHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -112,6 +114,8 @@ import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperato
 import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.source.SchemaSourceFactory;
+import org.apache.iotdb.db.mpp.execution.operator.sink.IdentitySinkOperator;
+import org.apache.iotdb.db.mpp.execution.operator.sink.ShuffleHelperOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
@@ -171,6 +175,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryColl
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -1806,10 +1812,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 remoteInstanceId.toThrift(),
+                node.getIndexOfUpstreamSinkHandle(),
                 context.getInstanceContext()::failed)
             : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
+                node.getIndexOfUpstreamSinkHandle(),
                 upstreamEndPoint,
                 remoteInstanceId.toThrift(),
                 context.getInstanceContext()::failed);
@@ -1854,6 +1862,74 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return child;
   }
 
+  @Override
+  public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanContext context) {
+    context.addExchangeSumNum(1);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                IdentitySinkOperator.class.getSimpleName());
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    List<Operator> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(Collectors.toList());
+
+    checkArgument(
+        MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
+    FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+    DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0);
+    ISinkHandle sinkHandle =
+        MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
+            node.getDownStreamChannelLocationList(),
+            downStreamChannelIndex,
+            ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN,
+            localInstanceId.toThrift(),
+            node.getPlanNodeId().getId(),
+            context.getInstanceContext());
+    sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+    context.getDriverContext().setSinkHandle(sinkHandle);
+
+    return new IdentitySinkOperator(operatorContext, children, downStreamChannelIndex, sinkHandle);
+  }
+
+  @Override
+  public Operator visitShuffleSink(ShuffleSinkNode node, LocalExecutionPlanContext context) {
+    context.addExchangeSumNum(1);
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                ShuffleHelperOperator.class.getSimpleName());
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    List<Operator> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(Collectors.toList());
+
+    checkArgument(
+        MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
+    FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+    DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0);
+    ISinkHandle sinkHandle =
+        MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
+            node.getDownStreamChannelLocationList(),
+            downStreamChannelIndex,
+            ShuffleSinkHandle.ShuffleStrategyEnum.SIMPLE_ROUND_ROBIN,
+            localInstanceId.toThrift(),
+            node.getPlanNodeId().getId(),
+            context.getInstanceContext());
+    sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+    context.getDriverContext().setSinkHandle(sinkHandle);
+
+    return new ShuffleHelperOperator(operatorContext, children, downStreamChannelIndex, sinkHandle);
+  }
+
   @Override
   public Operator visitSchemaFetchMerge(
       SchemaFetchMergeNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 2527f5a74f..7b598ef8c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -73,6 +73,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryColl
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -162,7 +164,9 @@ public enum PlanNodeType {
   MERGE_SORT((short) 66),
   SHOW_QUERIES((short) 67),
   INTERNAL_BATCH_ACTIVATE_TEMPLATE((short) 68),
-  INTERNAL_CREATE_MULTI_TIMESERIES((short) 69);
+  INTERNAL_CREATE_MULTI_TIMESERIES((short) 69),
+  IDENTITY_SINK((short) 70),
+  SHUFFLE_SINK((short) 71);
 
   public static final int BYTES = Short.BYTES;
 
@@ -351,6 +355,10 @@ public enum PlanNodeType {
         return InternalBatchActivateTemplateNode.deserialize(buffer);
       case 69:
         return InternalCreateMultiTimeSeriesNode.deserialize(buffer);
+      case 70:
+        return IdentitySinkNode.deserialize(buffer);
+      case 71:
+        return ShuffleSinkNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8181efcfa3..2e87d388c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -71,6 +71,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryColl
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -356,4 +358,12 @@ public abstract class PlanVisitor<R, C> {
   public R visitInternalCreateMultiTimeSeries(InternalCreateMultiTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitIdentitySink(IdentitySinkNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitShuffleSink(ShuffleSinkNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 06310be926..7e35ec85da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -50,6 +50,9 @@ public class ExchangeNode extends SingleChildProcessNode {
 
   private List<String> outputColumnNames;
 
+  /** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode it matches */
+  private int indexOfUpstreamSinkHandle = 0;
+
   public ExchangeNode(PlanNodeId id) {
     super(id);
   }
@@ -102,10 +105,12 @@ public class ExchangeNode extends SingleChildProcessNode {
       outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
       outputColumnNamesSize--;
     }
+    int index = ReadWriteIOUtils.readInt(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
     exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
     exchangeNode.setOutputColumnNames(outputColumnNames);
+    exchangeNode.setIndexOfUpstreamSinkHandle(index);
     return exchangeNode;
   }
 
@@ -120,6 +125,7 @@ public class ExchangeNode extends SingleChildProcessNode {
     for (String outputColumnName : outputColumnNames) {
       ReadWriteIOUtils.write(outputColumnName, byteBuffer);
     }
+    ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, byteBuffer);
   }
 
   @Override
@@ -133,6 +139,7 @@ public class ExchangeNode extends SingleChildProcessNode {
     for (String outputColumnName : outputColumnNames) {
       ReadWriteIOUtils.write(outputColumnName, stream);
     }
+    ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, stream);
   }
 
   @Override
@@ -154,6 +161,14 @@ public class ExchangeNode extends SingleChildProcessNode {
     return remoteSourceNode;
   }
 
+  public int getIndexOfUpstreamSinkHandle() {
+    return indexOfUpstreamSinkHandle;
+  }
+
+  public void setIndexOfUpstreamSinkHandle(int indexOfUpstreamSinkHandle) {
+    this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
+  }
+
   public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
     this.remoteSourceNode = remoteSourceNode;
     this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
new file mode 100644
index 0000000000..a4e549245c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class IdentitySinkNode extends MultiChildrenSinkNode {
+
+  public IdentitySinkNode(
+      PlanNodeId id, List<DownStreamChannelLocation> downStreamChannelLocationList) {
+    super(id, downStreamChannelLocationList);
+  }
+
+  public IdentitySinkNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      List<DownStreamChannelLocation> downStreamChannelLocationList) {
+    super(id, children, downStreamChannelLocationList);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new IdentitySinkNode(getPlanNodeId(), getDownStreamChannelLocationList());
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.IDENTITY_SINK.serialize(byteBuffer);
+    ReadWriteIOUtils.write(downStreamChannelLocationList.size(), byteBuffer);
+    for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+      downStreamChannelLocation.serialize(byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.IDENTITY_SINK.serialize(stream);
+    ReadWriteIOUtils.write(downStreamChannelLocationList.size(), stream);
+    for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+      downStreamChannelLocation.serialize(stream);
+    }
+  }
+
+  public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<DownStreamChannelLocation> downStreamChannelLocationList = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      downStreamChannelLocationList.add(DownStreamChannelLocation.deserialize(byteBuffer));
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new IdentitySinkNode(planNodeId, downStreamChannelLocationList);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
new file mode 100644
index 0000000000..f1fbd92e3f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public abstract class MultiChildrenSinkNode extends SinkNode {
+
+  protected List<PlanNode> children;
+
+  protected final List<DownStreamChannelLocation> downStreamChannelLocationList;
+
+  public MultiChildrenSinkNode(PlanNodeId id) {
+    super(id);
+    this.children = new ArrayList<>();
+    this.downStreamChannelLocationList = new ArrayList<>();
+  }
+
+  protected MultiChildrenSinkNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      List<DownStreamChannelLocation> downStreamChannelLocationList) {
+    super(id);
+    this.children = children;
+    this.downStreamChannelLocationList = downStreamChannelLocationList;
+  }
+
+  protected MultiChildrenSinkNode(
+      PlanNodeId id, List<DownStreamChannelLocation> downStreamChannelLocationList) {
+    super(id);
+    this.children = new ArrayList<>();
+    this.downStreamChannelLocationList = downStreamChannelLocationList;
+  }
+
+  public void setChildren(List<PlanNode> children) {
+    this.children = children;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.children.add(child);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    MultiChildrenSinkNode that = (MultiChildrenSinkNode) o;
+    return children.equals(that.children);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), children);
+  }
+
+  @Override
+  public void send() {}
+
+  @Override
+  public void close() throws Exception {}
+
+  public List<DownStreamChannelLocation> getDownStreamChannelLocationList() {
+    return downStreamChannelLocationList;
+  }
+
+  public void addDownStreamChannelLocation(DownStreamChannelLocation downStreamChannelLocation) {
+    downStreamChannelLocationList.add(downStreamChannelLocation);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/ShuffleSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/ShuffleSinkNode.java
new file mode 100644
index 0000000000..81937f8140
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/ShuffleSinkNode.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Responsible for creating ShuffleHelperOperator and corresponding SinkHandle */
+public class ShuffleSinkNode extends MultiChildrenSinkNode {
+
+  public ShuffleSinkNode(
+      PlanNodeId id, List<DownStreamChannelLocation> downStreamChannelLocationList) {
+    super(id, downStreamChannelLocationList);
+  }
+
+  public ShuffleSinkNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      List<DownStreamChannelLocation> downStreamChannelLocationList) {
+    super(id, children, downStreamChannelLocationList);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new ShuffleSinkNode(getPlanNodeId(), getDownStreamChannelLocationList());
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.SHUFFLE_SINK.serialize(byteBuffer);
+    ReadWriteIOUtils.write(downStreamChannelLocationList.size(), byteBuffer);
+    for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+      downStreamChannelLocation.serialize(byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.SHUFFLE_SINK.serialize(stream);
+    ReadWriteIOUtils.write(downStreamChannelLocationList.size(), stream);
+    for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+      downStreamChannelLocation.serialize(stream);
+    }
+  }
+
+  public static ShuffleSinkNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<DownStreamChannelLocation> downStreamChannelLocationList = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      downStreamChannelLocationList.add(DownStreamChannelLocation.deserialize(byteBuffer));
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new ShuffleSinkNode(planNodeId, downStreamChannelLocationList);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 8bcb1b300b..1ac1c32e91 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index 674a80ce3e..aa15f19967 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index 786d5b1830..3112e64f4b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -68,7 +72,7 @@ public class MPPDataExchangeManagerTest {
 
     ISourceHandle localSourceHandle =
         mppDataExchangeManager.createLocalSourceHandleForFragment(
-            remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, t -> {});
+            remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, 0, t -> {});
 
     Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
 
@@ -101,7 +105,7 @@ public class MPPDataExchangeManagerTest {
 
     ISourceHandle localSourceHandle =
         mppDataExchangeManager.createLocalSourceHandleForFragment(
-            localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId, t -> {});
+            localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId, 0, t -> {});
 
     Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index d50e69dd2a..67209c760d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.SinkHandle;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index d0b8f49f29..8d6e7cd7d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
@@ -113,6 +114,7 @@ public class SourceHandleTest {
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
+            0,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockTsBlockSerde,
@@ -228,6 +230,7 @@ public class SourceHandleTest {
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
+            0,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockTsBlockSerde,
@@ -388,6 +391,7 @@ public class SourceHandleTest {
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
+            0,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockTsBlockSerde,
@@ -560,6 +564,7 @@ public class SourceHandleTest {
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
+            0,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockTsBlockSerde,
@@ -647,6 +652,7 @@ public class SourceHandleTest {
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
+            0,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
             mockTsBlockSerde,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index 0e7b5ffb96..b3b6437769 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.exchange;
 
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -63,11 +64,6 @@ public class StubSinkHandle implements ISinkHandle {
     this.tsBlocks.add(tsBlock);
   }
 
-  @Override
-  public void send(int partition, List<TsBlock> tsBlocks) {
-    this.tsBlocks.addAll(tsBlocks);
-  }
-
   @Override
   public void setNoMoreTsBlocks() {
     if (closed) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
new file mode 100644
index 0000000000..b09f34ea5d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.node.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class IdentitySinkNodeSerdeTest {
+
+  @Test
+  public void testSerializeAndDeserialize() throws IllegalPathException {
+    DownStreamChannelLocation downStreamChannelLocation =
+        new DownStreamChannelLocation(
+            new TEndPoint("test", 1), new TFragmentInstanceId("test", 1, "test"), "test");
+    IdentitySinkNode identitySinkNode1 =
+        new IdentitySinkNode(
+            new PlanNodeId("testIdentitySinkNode"),
+            Collections.singletonList(downStreamChannelLocation));
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    identitySinkNode1.serialize(byteBuffer);
+    byteBuffer.flip();
+    assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), identitySinkNode1);
+
+    IdentitySinkNode identitySinkNode2 =
+        new IdentitySinkNode(new PlanNodeId("testIdentitySinkNode"), Collections.emptyList());
+    ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
+    identitySinkNode2.serialize(byteBuffer2);
+    byteBuffer2.flip();
+    assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer2), identitySinkNode2);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/ShuffleSinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/ShuffleSinkNodeSerdeTest.java
new file mode 100644
index 0000000000..c3c9c69358
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/ShuffleSinkNodeSerdeTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.node.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShuffleSinkNodeSerdeTest {
+  @Test
+  public void testSerializeAndDeserialize() throws IllegalPathException {
+    DownStreamChannelLocation downStreamChannelLocation =
+        new DownStreamChannelLocation(
+            new TEndPoint("test", 1), new TFragmentInstanceId("test", 1, "test"), "test");
+    ShuffleSinkNode shuffleSinkNode1 =
+        new ShuffleSinkNode(
+            new PlanNodeId("testIdentitySinkNode"),
+            Collections.singletonList(downStreamChannelLocation));
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    shuffleSinkNode1.serialize(byteBuffer);
+    byteBuffer.flip();
+    assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), shuffleSinkNode1);
+
+    ShuffleSinkNode shuffleSinkNode2 =
+        new ShuffleSinkNode(new PlanNodeId("testIdentitySinkNode"), Collections.emptyList());
+    ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
+    shuffleSinkNode2.serialize(byteBuffer2);
+    byteBuffer2.flip();
+    assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer2), shuffleSinkNode2);
+  }
+}
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index cf3fab6143..23fe7db8b0 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -63,6 +63,8 @@ struct TGetDataBlockRequest {
   1: required TFragmentInstanceId sourceFragmentInstanceId
   2: required i32 startSequenceId
   3: required i32 endSequenceId
+  // index of upstream SinkHandle
+  4: required i32 index
 }
 
 struct TGetDataBlockResponse {
@@ -73,6 +75,8 @@ struct TAcknowledgeDataBlockEvent {
   1: required TFragmentInstanceId sourceFragmentInstanceId
   2: required i32 startSequenceId
   3: required i32 endSequenceId
+  // index of upstream SinkHandle
+  4: required i32 index
 }
 
 struct TNewDataBlockEvent {