You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/17 01:58:16 UTC
[iotdb] branch ShuffleFeature updated: Shuffle sink
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ShuffleFeature
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ShuffleFeature by this push:
new ea47360f53 Shuffle sink
ea47360f53 is described below
commit ea47360f5379f9b90a8a8b30a58d65174c2a556a
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 17 09:58:10 2023 +0800
Shuffle sink
---
README.md | 2 +-
docs/UserGuide/API/Programming-Java-Native-API.md | 2 +-
.../Data-Concept/Data-Model-and-Terminology.md | 2 +-
.../UserGuide/API/Programming-Java-Native-API.md | 2 +-
.../Data-Concept/Data-Model-and-Terminology.md | 2 +-
.../iotdb/db/it/query/IoTDBShuffleSink1IT.java | 150 +++++++++++
.../iotdb/db/it/query/IoTDBShuffleSink2IT.java | 154 +++++++++++
.../iotdb/db/mpp/execution/driver/Driver.java | 2 +-
.../db/mpp/execution/driver/DriverContext.java | 2 +-
.../iotdb/db/mpp/execution/driver/IDriver.java | 2 +-
.../exchange/IMPPDataExchangeManager.java | 32 ++-
.../execution/exchange/MPPDataExchangeManager.java | 291 ++++++++++++--------
.../mpp/execution/exchange/SharedTsBlockQueue.java | 6 +
.../exchange/sink/DownStreamChannelIndex.java | 37 +++
.../exchange/sink/DownStreamChannelLocation.java | 93 +++++++
.../mpp/execution/exchange/sink/ISinkChannel.java | 31 +++
.../execution/exchange/{ => sink}/ISinkHandle.java | 40 +--
.../exchange/{ => sink}/LocalSinkHandle.java | 52 ++--
.../execution/exchange/sink/ShuffleSinkHandle.java | 299 +++++++++++++++++++++
.../execution/exchange/{ => sink}/SinkHandle.java | 119 ++++----
.../exchange/{ => source}/ISourceHandle.java | 2 +-
.../exchange/{ => source}/LocalSourceHandle.java | 3 +-
.../exchange/{ => source}/SourceHandle.java | 22 +-
.../fragment/FragmentInstanceExecution.java | 2 +-
.../fragment/FragmentInstanceManager.java | 2 +-
.../operator/sink/IdentitySinkOperator.java | 132 +++++++++
.../operator/sink/ShuffleHelperOperator.java | 134 +++++++++
.../operator/source/ExchangeOperator.java | 2 +-
.../db/mpp/execution/schedule/task/DriverTask.java | 2 +-
.../db/mpp/plan/execution/QueryExecution.java | 4 +-
.../plan/execution/memory/MemorySourceHandle.java | 2 +-
.../plan/planner/LocalExecutionPlanContext.java | 2 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 82 +++++-
.../mpp/plan/planner/plan/node/PlanNodeType.java | 10 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +
.../planner/plan/node/process/ExchangeNode.java | 15 ++
.../planner/plan/node/sink/IdentitySinkNode.java | 85 ++++++
.../plan/node/sink/MultiChildrenSinkNode.java | 110 ++++++++
.../planner/plan/node/sink/ShuffleSinkNode.java | 91 +++++++
.../execution/exchange/LocalSinkHandleTest.java | 2 +
.../execution/exchange/LocalSourceHandleTest.java | 1 +
.../exchange/MPPDataExchangeManagerTest.java | 8 +-
.../db/mpp/execution/exchange/SinkHandleTest.java | 1 +
.../mpp/execution/exchange/SourceHandleTest.java | 6 +
.../db/mpp/execution/exchange/StubSinkHandle.java | 6 +-
.../plan/node/sink/IdentitySinkNodeSerdeTest.java | 60 +++++
.../plan/node/sink/ShuffleSinkNodeSerdeTest.java | 59 ++++
thrift/src/main/thrift/datanode.thrift | 4 +
48 files changed, 1938 insertions(+), 241 deletions(-)
diff --git a/README.md b/README.md
index 78409964fa..acf9f04792 100644
--- a/README.md
+++ b/README.md
@@ -272,7 +272,7 @@ IoTDB> SHOW DATABASES
Total line number = 1
```
-After the database is set, we can use CREATE TIMESERIES to create a new timeseries. When creating a timeseries, we should define its data type and the encoding scheme. Here We create two timeseries:
+After the database is set, we can use CREATE TIMESERIES to create a new timeseries. When creating a timeseries, we should define its data type and the encoding scheme. Here we create two timeseries:
```
IoTDB> CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN
diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md
index bb8871f514..817223155d 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -47,7 +47,7 @@ In root directory:
## Syntax Convention
-- **IoTDB-SQL interface:** The input SQL parameter needs to conform to the [syntax conventions](../Reference/Syntax-Conventions.md) and be escaped for JAVA strings. For example, you need to add a backslash before the double-quotes. (That is: after JAVA escaping, it is consistent with the SQL statement executed on the command line.)
+- **IoTDB-SQL interface:** The input SQL parameter needs to conform to the [syntax conventions](../Syntax-Conventions/Literal-Values.md) and be escaped for JAVA strings. For example, you need to add a backslash before the double-quotes. (That is: after JAVA escaping, it is consistent with the SQL statement executed on the command line.)
- **Other interfaces:**
- The node names in path or path prefix as parameter: The node names which should be escaped by backticks (`) in the SQL statement, escaping is required here.
- Identifiers (such as template names) as parameters: The identifiers which should be escaped by backticks (`) in the SQL statement, and escaping is not required here.
diff --git a/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md b/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md
index 393630fd03..5fc169cbda 100644
--- a/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md
+++ b/docs/UserGuide/Data-Concept/Data-Model-and-Terminology.md
@@ -85,7 +85,7 @@ The following are the constraints on the `nodeName`:
* [ 0-9 a-z A-Z _ ] (letters, numbers, underscore)
* ['\u2E80'..'\u9FFF'] (Chinese characters)
* In particular, if the system is deployed on a Windows machine, the database layer name will be case-insensitive. For example, creating both `root.ln` and `root.LN` at the same time is not allowed.
-* If you want to use special characters in `nodeName`, you can quote it with back quote, detailed information can be found here: [Syntax-Conventions](https://iotdb.apache.org/UserGuide/Master/Reference/Syntax-Conventions.html).
+* If you want to use special characters in `nodeName`, you can quote it with back quote, detailed information can be found from charpter Syntax-Conventions,click here: [Syntax-Conventions](https://iotdb.apache.org/UserGuide/Master/Syntax-Conventions/Literal-Values.html).
### Path Pattern
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index ed79f577b1..f6e8004562 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -50,7 +50,7 @@ mvn clean install -pl session -am -Dmaven.test.skip=true
## 语法说明
- - 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 [语法规范](../Reference/Syntax-Conventions.md) ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。)
+ - 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 [语法规范](../Syntax-Conventions/Literal-Values.md) ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。)
- 对于其他接口:
- 经参数传入的路径或路径前缀中的节点: 在 SQL 语句中需要使用反引号(`)进行转义的,此处均需要进行转义。
- 经参数传入的标识符(如模板名):在 SQL 语句中需要使用反引号(`)进行转义的,均可以不用进行转义。
diff --git a/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md b/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md
index e559251312..59f313ad1b 100644
--- a/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md
+++ b/docs/zh/UserGuide/Data-Concept/Data-Model-and-Terminology.md
@@ -88,7 +88,7 @@ wildcard
* [ 0-9 a-z A-Z _ ] (字母,数字,下划线)
* ['\u2E80'..'\u9FFF'] (UNICODE 中文字符)
* 特别地,如果系统在 Windows 系统上部署,那么 database 路径结点名是大小写不敏感的。例如,同时创建`root.ln` 和 `root.LN` 是不被允许的。
-* 如果需要在路径结点名中用特殊字符,可以用反引号引用路径结点名,具体使用方法可以参考[语法约定](../Reference/Syntax-Conventions.md)。
+* 如果需要在路径结点名中用特殊字符,可以用反引号引用路径结点名,具体使用方法可以参考[语法约定](../Syntax-Conventions/Literal-Values.md)。
### 路径模式(Path Pattern)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink1IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink1IT.java
new file mode 100644
index 0000000000..aff8691039
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink1IT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.query;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBShuffleSink1IT {
+ private static final String[] SINGLE_SERIES =
+ new String[] {
+ "create database root.single",
+ "insert into root.single.d1(time,s1,s2) values (1,2,2)",
+ "insert into root.single.d1(time,s1,s2) values (now(),3,3)",
+ "insert into root.single.d2(time,s1,s2) values (1,4,4)",
+ "insert into root.single.d2(time,s1,s2) values (now(),5,5)"
+ };
+ // two devices
+ private static final String[] MULTI_SERIES =
+ new String[] {
+ "create database root.sg",
+ "insert into root.sg.d1(time,s1,s2) values (1,2,2)",
+ "insert into root.sg.d1(time,s1,s2) values (now(),3,3)",
+ "insert into root.sg.d2(time,s1,s2) values (1,4,4)",
+ "insert into root.sg.d2(time,s1,s2) values (now(),5,5)"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
+ EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(2);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SINGLE_SERIES);
+ prepareData(MULTI_SERIES);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByDeviceWithoutValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2"};
+
+ resultSetEqualTest(
+ "select count(s1) from root.single.** align by device", expectedHeader1, retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** align by device", expectedHeader2, retArray2);
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByDeviceWithValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,1"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.single.** where s1 <= 4 align by device",
+ expectedHeader1,
+ retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,1,1"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** where s1 <= 4 align by device",
+ expectedHeader2,
+ retArray2);
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByTimeWithoutValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.single.** order by time align by device",
+ expectedHeader1,
+ retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** order by time align by device",
+ expectedHeader2,
+ retArray2);
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByTimeWithValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,1"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.single.** where s1 <= 4 order by time align by device",
+ expectedHeader1,
+ retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,1,1"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** where s1 <= 4 order by time align by device",
+ expectedHeader2,
+ retArray2);
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink2IT.java
new file mode 100644
index 0000000000..3be25557f4
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBShuffleSink2IT.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.query;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBShuffleSink2IT {
+ private static final String[] SINGLE_SERIES =
+ new String[] {
+ "create database root.single",
+ "insert into root.single.d1(time,s1) values (1,1,1)",
+ "insert into root.single.d1(time,s1) values (now(),2,2)",
+ "insert into root.single.d2(time,s1) values (now(),3,3)",
+ "insert into root.single.d2(time,s1) values (1,4,4)",
+ "insert into root.single.d3(time,s1) values (now(),5,5)",
+ "insert into root.single.d3(time,s1) values (1,6,6)"
+ };
+ // three devices, three data regions
+ private static final String[] MULTI_SERIES =
+ new String[] {
+ "create database root.sg",
+ "insert into root.sg.d1(time,s1,s2) values (1,1,1)",
+ "insert into root.sg.d1(time,s1,s2) values (now(),2,2)",
+ "insert into root.sg.d2(time,s1,s2) values (now(),3,3)",
+ "insert into root.sg.d2(time,s1,s2) values (1,4,4)",
+ "insert into root.sg.d3(time,s1,s2) values (now(),5,5)",
+ "insert into root.sg.d3(time,s1,s2) values (1,6,6)"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
+ EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(3);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SINGLE_SERIES);
+ prepareData(MULTI_SERIES);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByDeviceWithoutValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2", "root.single.d3,2"};
+
+ resultSetEqualTest(
+ "select count(s1) from root.single.** align by device", expectedHeader1, retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2", "root.sg.d3,2,2"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** align by device", expectedHeader2, retArray2);
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByDeviceWithValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2", "root.single.d3,0"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.single.** where s1 <= 4 align by device",
+ expectedHeader1,
+ retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2", "root.sg.d3,0,0"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** where s1 <= 4 align by device",
+ expectedHeader2,
+ retArray2);
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByTimeWithoutValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,2", "root.single.d3,2"};
+
+ resultSetEqualTest(
+ "select count(s1) from root.single.** order by time align by device",
+ expectedHeader1,
+ retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,2,2", "root.sg.d3,2,2"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.sg.** order by time align by device",
+ expectedHeader2,
+ retArray2);
+ }
+
+ @Test
+ public void testCountAlignByDeviceOrderByTimeWithValueFilter() {
+ // result of SINGLE_SERIES
+ String expectedHeader1 = "Device,count(s1)";
+ String[] retArray1 = new String[] {"root.single.d1,2", "root.single.d2,1", "root.single.d3,0"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.single.** where s1 <= 4 order by time align by device",
+ expectedHeader1,
+ retArray1);
+
+ // result of MULTI_SERIES
+ String expectedHeader2 = "Device,count(s1),count(s2)";
+ String[] retArray2 = new String[] {"root.sg.d1,2,2", "root.sg.d2,1,1", "root.sg.d3,0,0"};
+
+ resultSetEqualTest(
+ "select count(s1),count(s2) from root.** where s1 <= 4 order by time align by device",
+ expectedHeader2,
+ retArray2);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index a1ab1c18db..9b62d2faf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.driver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index ea369d8fad..e0a221d599 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.driver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index ff55d5456c..0c3a5c07e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import com.google.common.util.concurrent.ListenableFuture;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
index c02595374c..666e0709e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
@@ -20,9 +20,17 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import java.util.List;
+
public interface IMPPDataExchangeManager {
/**
* Create a sink handle who sends data blocks to a remote downstream fragment instance in async
@@ -30,26 +38,17 @@ public interface IMPPDataExchangeManager {
*
* @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
* blocks to the sink handle.
- * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
- * should be sent to.
- * @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
- * @param remotePlanNodeId The plan node ID of the local fragment instance.
* @param instanceContext The context of local fragment instance.
*/
- ISinkHandle createSinkHandle(
+ ISinkHandle createShuffleSinkHandle(
+ List<DownStreamChannelLocation> downStreamChannelLocationList,
+ DownStreamChannelIndex downStreamChannelIndex,
+ ShuffleSinkHandle.ShuffleStrategyEnum shuffleStrategyEnum,
TFragmentInstanceId localFragmentInstanceId,
- TEndPoint remoteEndpoint,
- TFragmentInstanceId remoteFragmentInstanceId,
- String remotePlanNodeId,
String localPlanNodeId,
FragmentInstanceContext instanceContext);
- ISinkHandle createLocalSinkHandleForFragment(
- TFragmentInstanceId localFragmentInstanceId,
- TFragmentInstanceId remoteFragmentInstanceId,
- String remotePlanNodeId,
- FragmentInstanceContext instanceContext);
-
+ ISinkHandle createLocalSinkHandleForPipeline(DriverContext driverContext, String planNodeId);
/**
* Create a source handle who fetches data blocks from a remote upstream fragment instance for a
* plan node of a local fragment instance in async manner.
@@ -65,6 +64,7 @@ public interface IMPPDataExchangeManager {
ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback);
@@ -73,8 +73,12 @@ public interface IMPPDataExchangeManager {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
TFragmentInstanceId remoteFragmentInstanceId,
+ int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback);
+ /** SharedTsBlockQueue must belong to corresponding LocalSinkHandle */
+ ISourceHandle createLocalSourceHandleForPipeline(SharedTsBlockQueue queue, DriverContext context);
+
/**
* Release all the related resources of a fragment instance, including data blocks that are not
* yet fetched by downstream fragment instances.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index f51c4b77cb..0b116a6cbc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -23,6 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.SinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -43,13 +52,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_SERVER;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER;
@@ -60,25 +72,9 @@ import static org.apache.iotdb.db.mpp.metric.DataExchangeCountMetricSet.SEND_NEW
public class MPPDataExchangeManager implements IMPPDataExchangeManager {
- private static final Logger logger = LoggerFactory.getLogger(MPPDataExchangeManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MPPDataExchangeManager.class);
- public interface SourceHandleListener {
- void onFinished(ISourceHandle sourceHandle);
-
- void onAborted(ISourceHandle sourceHandle);
-
- void onFailure(ISourceHandle sourceHandle, Throwable t);
- }
-
- public interface SinkHandleListener {
- void onFinish(ISinkHandle sinkHandle);
-
- void onEndOfBlocks(ISinkHandle sinkHandle);
-
- Optional<Throwable> onAborted(ISinkHandle sinkHandle);
-
- void onFailure(ISinkHandle sinkHandle, Throwable t);
- }
+ // region =========== MPPDataExchangeServiceImpl ===========
/** Handle thrift communications. */
class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
@@ -94,18 +90,23 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
req.sourceFragmentInstanceId.queryId,
req.sourceFragmentInstanceId.fragmentId,
req.sourceFragmentInstanceId.instanceId))) {
- logger.debug(
+ LOGGER.debug(
"[ProcessGetTsBlockRequest] sequence ID in [{}, {})",
req.getStartSequenceId(),
req.getEndSequenceId());
- if (!sinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
+ if (!shuffleSinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
throw new TException(
"Source fragment instance not found. Fragment instance ID: "
+ req.getSourceFragmentInstanceId()
+ ".");
}
TGetDataBlockResponse resp = new TGetDataBlockResponse();
- SinkHandle sinkHandle = (SinkHandle) sinkHandles.get(req.getSourceFragmentInstanceId());
+ // index of the channel must be a SinkHandle
+ SinkHandle sinkHandle =
+ (SinkHandle)
+ (shuffleSinkHandles
+ .get(req.getSourceFragmentInstanceId())
+ .getChannel(req.getIndex()));
for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
try {
ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
@@ -132,21 +133,23 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
e.sourceFragmentInstanceId.queryId,
e.sourceFragmentInstanceId.fragmentId,
e.sourceFragmentInstanceId.instanceId))) {
- logger.debug(
+ LOGGER.debug(
"Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.",
e.getStartSequenceId(),
e.getEndSequenceId(),
e.getSourceFragmentInstanceId());
- if (!sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
- logger.debug(
+ if (!shuffleSinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
+ LOGGER.debug(
"received ACK event but target FragmentInstance[{}] is not found.",
e.getSourceFragmentInstanceId());
return;
}
- ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
+ // index of the channel must be a SinkHandle
+ ((SinkHandle)
+ (shuffleSinkHandles.get(e.getSourceFragmentInstanceId()).getChannel(e.getIndex())))
.acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
} catch (Throwable t) {
- logger.warn(
+ LOGGER.warn(
"ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
throw t;
} finally {
@@ -162,7 +165,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
long startTime = System.nanoTime();
try (SetThreadName fragmentInstanceName =
new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
- logger.debug(
+ LOGGER.debug(
"New data block event received, for plan node {} of {} from {}.",
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
@@ -180,7 +183,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
// may
// have already been stopped. For example, in the query whit LimitOperator, the downstream
// FragmentInstance may be finished, although the upstream is still working.
- logger.debug(
+ LOGGER.debug(
"received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found",
e.getTargetFragmentInstanceId());
return;
@@ -198,7 +201,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException {
try (SetThreadName fragmentInstanceName =
new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
- logger.debug(
+ LOGGER.debug(
"End of data block event received, for plan node {} of {} from {}.",
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
@@ -212,7 +215,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
: (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
- logger.debug(
+ LOGGER.debug(
"received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found",
e.getTargetFragmentInstanceId());
return;
@@ -223,6 +226,28 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
}
+ // endregion
+
+ // region =========== listener ===========
+
+ public interface SourceHandleListener {
+ void onFinished(ISourceHandle sourceHandle);
+
+ void onAborted(ISourceHandle sourceHandle);
+
+ void onFailure(ISourceHandle sourceHandle, Throwable t);
+ }
+
+ public interface SinkHandleListener {
+ void onFinish(ISinkHandle sinkHandle);
+
+ void onEndOfBlocks(ISinkHandle sinkHandle);
+
+ Optional<Throwable> onAborted(ISinkHandle sinkHandle);
+
+ void onFailure(ISinkHandle sinkHandle, Throwable t);
+ }
+
/** Listen to the state changes of a source handle. */
class SourceHandleListenerImpl implements SourceHandleListener {
@@ -234,12 +259,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@Override
public void onFinished(ISourceHandle sourceHandle) {
- logger.debug("[ScHListenerOnFinish]");
+ LOGGER.debug("[ScHListenerOnFinish]");
Map<String, ISourceHandle> sourceHandleMap =
sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
if (sourceHandleMap == null
|| sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == null) {
- logger.debug("[ScHListenerAlreadyReleased]");
+ LOGGER.debug("[ScHListenerAlreadyReleased]");
}
if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
@@ -249,13 +274,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@Override
public void onAborted(ISourceHandle sourceHandle) {
- logger.debug("[ScHListenerOnAbort]");
+ LOGGER.debug("[ScHListenerOnAbort]");
onFinished(sourceHandle);
}
@Override
public void onFailure(ISourceHandle sourceHandle, Throwable t) {
- logger.warn("Source handle failed due to: ", t);
+ LOGGER.warn("Source handle failed due to: ", t);
if (onFailureCallback != null) {
onFailureCallback.call(t);
}
@@ -277,17 +302,17 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@Override
public void onFinished(ISourceHandle sourceHandle) {
- logger.debug("[ScHListenerOnFinish]");
+ LOGGER.debug("[ScHListenerOnFinish]");
}
@Override
public void onAborted(ISourceHandle sourceHandle) {
- logger.debug("[ScHListenerOnAbort]");
+ LOGGER.debug("[ScHListenerOnAbort]");
}
@Override
public void onFailure(ISourceHandle sourceHandle, Throwable t) {
- logger.warn("Source handle failed due to: ", t);
+ LOGGER.warn("Source handle failed due to: ", t);
if (onFailureCallback != null) {
onFailureCallback.call(t);
}
@@ -295,12 +320,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
/** Listen to the state changes of a sink handle. */
- class SinkHandleListenerImpl implements SinkHandleListener {
+ class ShuffleSinkHandleListenerImpl implements SinkHandleListener {
private final FragmentInstanceContext context;
private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
- public SinkHandleListenerImpl(
+ public ShuffleSinkHandleListenerImpl(
FragmentInstanceContext context,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
this.context = context;
@@ -309,36 +334,28 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@Override
public void onFinish(ISinkHandle sinkHandle) {
- logger.debug("[SkHListenerOnFinish]");
- removeFromMPPDataExchangeManager(sinkHandle);
+ LOGGER.debug("[ShuffleSinkHandleListenerOnFinish]");
+ shuffleSinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
context.finished();
}
@Override
public void onEndOfBlocks(ISinkHandle sinkHandle) {
- logger.debug("[SkHListenerOnEndOfTsBlocks]");
+ LOGGER.debug("[ShuffleSinkHandleListenerOnEndOfTsBlocks]");
context.transitionToFlushing();
}
@Override
public Optional<Throwable> onAborted(ISinkHandle sinkHandle) {
- logger.debug("[SkHListenerOnAbort]");
- removeFromMPPDataExchangeManager(sinkHandle);
+ LOGGER.debug("[ShuffleSinkHandleListenerOnAbort]");
+ shuffleSinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
return context.getFailureCause();
}
- private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
- if (sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()) == null) {
- logger.debug("[RemoveNoSinkHandle]");
- } else {
- logger.debug("[RemoveSinkHandle]");
- }
- }
-
@Override
public void onFailure(ISinkHandle sinkHandle, Throwable t) {
// TODO: (xingtanzjr) should we remove the sinkHandle from MPPDataExchangeManager ?
- logger.warn("Sink handle failed due to", t);
+ LOGGER.warn("Sink handle failed due to", t);
if (onFailureCallback != null) {
onFailureCallback.call(t);
}
@@ -350,12 +367,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
* handle doesn't equal the finish of the whole fragment, therefore we don't need to notify
* fragment context. But if it's aborted or failed, it can lead to the total fail.
*/
- static class PipelineSinkHandleListenerImpl implements SinkHandleListener {
+ static class SinkHandleListenerImpl implements SinkHandleListener {
private final FragmentInstanceContext context;
private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
- public PipelineSinkHandleListenerImpl(
+ public SinkHandleListenerImpl(
FragmentInstanceContext context,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
this.context = context;
@@ -364,36 +381,42 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@Override
public void onFinish(ISinkHandle sinkHandle) {
- logger.debug("[SkHListenerOnFinish]");
+ LOGGER.debug("[SkHListenerOnFinish]");
}
@Override
public void onEndOfBlocks(ISinkHandle sinkHandle) {
- logger.debug("[SkHListenerOnEndOfTsBlocks]");
+ LOGGER.debug("[SkHListenerOnEndOfTsBlocks]");
}
@Override
public Optional<Throwable> onAborted(ISinkHandle sinkHandle) {
- logger.debug("[SkHListenerOnAbort]");
+ LOGGER.debug("[SkHListenerOnAbort]");
return context.getFailureCause();
}
@Override
public void onFailure(ISinkHandle sinkHandle, Throwable t) {
- logger.warn("Sink handle failed due to", t);
+ LOGGER.warn("Sink handle failed due to", t);
if (onFailureCallback != null) {
onFailureCallback.call(t);
}
}
}
+ // endregion
+
+ // region =========== MPPDataExchangeManager ===========
+
private final LocalMemoryManager localMemoryManager;
private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
private final ExecutorService executorService;
private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager;
private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
- private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
+
+ /** Each FI has only one ShuffleSinkHandle. */
+ private final Map<TFragmentInstanceId, ISinkHandle> shuffleSinkHandles;
private MPPDataExchangeServiceImpl mppDataExchangeService;
@@ -409,7 +432,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
this.mppDataExchangeServiceClientManager =
Validate.notNull(mppDataExchangeServiceClientManager);
sourceHandles = new ConcurrentHashMap<>();
- sinkHandles = new ConcurrentHashMap<>();
+ shuffleSinkHandles = new ConcurrentHashMap<>();
}
public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
@@ -419,7 +442,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
return mppDataExchangeService;
}
- @Override
public synchronized ISinkHandle createLocalSinkHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
TFragmentInstanceId remoteFragmentInstanceId,
@@ -427,12 +449,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
// TODO: replace with callbacks to decouple MPPDataExchangeManager from
// FragmentInstanceContext
FragmentInstanceContext instanceContext) {
- if (sinkHandles.containsKey(localFragmentInstanceId)) {
- throw new IllegalStateException(
- "Local sink handle for " + localFragmentInstanceId + " exists.");
- }
- logger.debug(
+ LOGGER.debug(
"Create local sink handle to plan node {} of {} for {}",
remotePlanNodeId,
remoteFragmentInstanceId,
@@ -443,23 +461,20 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
LocalSourceHandle localSourceHandle =
sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
if (localSourceHandle != null) {
- logger.debug("Get shared tsblock queue from local source handle");
+ LOGGER.debug("Get SharedTsBlockQueue from local source handle");
queue =
((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
.getSharedTsBlockQueue();
} else {
- logger.debug("Create shared tsblock queue");
+ LOGGER.debug("Create SharedTsBlockQueue");
queue =
new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager);
}
- LocalSinkHandle localSinkHandle =
- new LocalSinkHandle(
- localFragmentInstanceId,
- queue,
- new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
- sinkHandles.put(localFragmentInstanceId, localSinkHandle);
- return localSinkHandle;
+ return new LocalSinkHandle(
+ localFragmentInstanceId,
+ queue,
+ new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
}
/**
@@ -468,7 +483,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
*/
public ISinkHandle createLocalSinkHandleForPipeline(
DriverContext driverContext, String planNodeId) {
- logger.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
+ LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
SharedTsBlockQueue queue =
new SharedTsBlockQueue(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
@@ -476,11 +491,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
localMemoryManager);
return new LocalSinkHandle(
queue,
- new PipelineSinkHandleListenerImpl(
+ new SinkHandleListenerImpl(
driverContext.getFragmentInstanceContext(), driverContext::failed));
}
- @Override
public ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
TEndPoint remoteEndpoint,
@@ -490,30 +504,84 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
// TODO: replace with callbacks to decouple MPPDataExchangeManager from
// FragmentInstanceContext
FragmentInstanceContext instanceContext) {
- if (sinkHandles.containsKey(localFragmentInstanceId)) {
- throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
- }
- logger.debug(
+ LOGGER.debug(
"Create sink handle to plan node {} of {} for {}",
remotePlanNodeId,
remoteFragmentInstanceId,
localFragmentInstanceId);
- SinkHandle sinkHandle =
- new SinkHandle(
- remoteEndpoint,
- remoteFragmentInstanceId,
- remotePlanNodeId,
- localPlanNodeId,
+ return new SinkHandle(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localPlanNodeId,
+ localFragmentInstanceId,
+ localMemoryManager,
+ executorService,
+ tsBlockSerdeFactory.get(),
+ new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
+ mppDataExchangeServiceClientManager);
+ }
+
+ @Override
+ public ISinkHandle createShuffleSinkHandle(
+ List<DownStreamChannelLocation> downStreamChannelLocationList,
+ DownStreamChannelIndex downStreamChannelIndex,
+ ShuffleSinkHandle.ShuffleStrategyEnum shuffleStrategyEnum,
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ // TODO: replace with callbacks to decouple MPPDataExchangeManager from
+ // FragmentInstanceContext
+ FragmentInstanceContext instanceContext) {
+ if (shuffleSinkHandles.containsKey(localFragmentInstanceId)) {
+ throw new IllegalStateException(
+ "ShuffleSinkHandle for " + localFragmentInstanceId + " exists.");
+ }
+
+ List<ISinkHandle> downStreamChannelList =
+ downStreamChannelLocationList.stream()
+ .map(
+ downStreamChannelLocation ->
+ createHandleForShuffleSink(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ downStreamChannelLocation,
+ instanceContext))
+ .collect(Collectors.toList());
+
+ ShuffleSinkHandle shuffleSinkHandle =
+ new ShuffleSinkHandle(
localFragmentInstanceId,
- localMemoryManager,
- executorService,
- tsBlockSerdeFactory.get(),
- new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
- mppDataExchangeServiceClientManager);
- sinkHandles.put(localFragmentInstanceId, sinkHandle);
- return sinkHandle;
+ downStreamChannelList,
+ downStreamChannelIndex,
+ shuffleStrategyEnum,
+ localPlanNodeId,
+ new ShuffleSinkHandleListenerImpl(instanceContext, instanceContext::failed));
+ shuffleSinkHandles.put(localFragmentInstanceId, shuffleSinkHandle);
+ return shuffleSinkHandle;
+ }
+
+ private ISinkHandle createHandleForShuffleSink(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ DownStreamChannelLocation downStreamChannelLocation,
+ FragmentInstanceContext instanceContext) {
+ if (isSameNode(downStreamChannelLocation.getRemoteEndpoint())) {
+ return createLocalSinkHandleForFragment(
+ localFragmentInstanceId,
+ downStreamChannelLocation.getRemoteFragmentInstanceId(),
+ downStreamChannelLocation.getRemotePlanNodeId(),
+ instanceContext);
+ } else {
+ return createSinkHandle(
+ localFragmentInstanceId,
+ downStreamChannelLocation.getRemoteEndpoint(),
+ downStreamChannelLocation.getRemoteFragmentInstanceId(),
+ downStreamChannelLocation.getRemotePlanNodeId(),
+ localPlanNodeId,
+ instanceContext);
+ }
}
/**
@@ -522,18 +590,18 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
*/
public ISourceHandle createLocalSourceHandleForPipeline(
SharedTsBlockQueue queue, DriverContext context) {
- logger.debug("Create local source handle for {}", context.getDriverTaskID());
+ LOGGER.debug("Create local source handle for {}", context.getDriverTaskID());
return new LocalSourceHandle(
queue,
new PipelineSourceHandleListenerImpl(context::failed),
context.getDriverTaskID().toString());
}
- @Override
public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
TFragmentInstanceId remoteFragmentInstanceId,
+ int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
@@ -545,17 +613,19 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
+ " exists.");
}
- logger.debug(
+ LOGGER.debug(
"Create local source handle from {} for plan node {} of {}",
remoteFragmentInstanceId,
localPlanNodeId,
localFragmentInstanceId);
SharedTsBlockQueue queue;
- if (sinkHandles.containsKey(remoteFragmentInstanceId)) {
- logger.debug("Get shared tsblock queue from local sink handle");
- queue = ((LocalSinkHandle) sinkHandles.get(remoteFragmentInstanceId)).getSharedTsBlockQueue();
+ if (shuffleSinkHandles.containsKey(remoteFragmentInstanceId)) {
+ LOGGER.debug("Get SharedTsBlockQueue from local sink handle");
+ queue =
+ ((LocalSinkHandle) shuffleSinkHandles.get(remoteFragmentInstanceId).getChannel(index))
+ .getSharedTsBlockQueue();
} else {
- logger.debug("Create shared tsblock queue");
+ LOGGER.debug("Create SharedTsBlockQueue");
queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, localMemoryManager);
}
LocalSourceHandle localSourceHandle =
@@ -574,6 +644,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
@@ -587,7 +658,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
+ " exists.");
}
- logger.debug(
+ LOGGER.debug(
"Create source handle from {} for plan node {} of {}",
remoteFragmentInstanceId,
localPlanNodeId,
@@ -599,6 +670,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
+ indexOfUpstreamSinkHandle,
localMemoryManager,
executorService,
tsBlockSerdeFactory.get(),
@@ -617,21 +689,21 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
* <p>This method should be called when a fragment instance finished in an abnormal state.
*/
public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) {
- logger.debug("[StartForceReleaseFIDataExchangeResource]");
- ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId);
+ LOGGER.debug("[StartForceReleaseFIDataExchangeResource]");
+ ISinkHandle sinkHandle = shuffleSinkHandles.get(fragmentInstanceId);
if (sinkHandle != null) {
sinkHandle.abort();
- sinkHandles.remove(fragmentInstanceId);
+ shuffleSinkHandles.remove(fragmentInstanceId);
}
Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
if (planNodeIdToSourceHandle != null) {
for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
- logger.debug("[CloseSourceHandle] {}", entry.getKey());
+ LOGGER.debug("[CloseSourceHandle] {}", entry.getKey());
entry.getValue().abort();
}
sourceHandles.remove(fragmentInstanceId);
}
- logger.debug("[EndForceReleaseFIDataExchangeResource]");
+ LOGGER.debug("[EndForceReleaseFIDataExchangeResource]");
}
/** @param suffix should be like [PlanNodeId].SourceHandle/SinHandle */
@@ -643,4 +715,5 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
+ "."
+ suffix;
}
+ // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index a40202de46..46048fdb96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -122,6 +124,10 @@ public class SharedTsBlockQueue {
return queue.isEmpty();
}
+ public int getNumOfBufferedTsBlocks() {
+ return queue.size();
+ }
+
public void setSinkHandle(LocalSinkHandle sinkHandle) {
this.sinkHandle = sinkHandle;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelIndex.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelIndex.java
new file mode 100644
index 0000000000..cd36b18ae6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelIndex.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+public class DownStreamChannelIndex {
+ /** CurrentIndex of downstream ISourceHandle */
+ private int currentIndex;
+
+ public DownStreamChannelIndex(int currentIndex) {
+ this.currentIndex = currentIndex;
+ }
+
+ public int getCurrentIndex() {
+ return currentIndex;
+ }
+
+ public void setCurrentIndex(int currentIndex) {
+ this.currentIndex = currentIndex;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelLocation.java
new file mode 100644
index 0000000000..83a74b0d55
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/DownStreamChannelLocation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DownStreamChannelLocation {
+
+ private final TEndPoint remoteEndpoint;
+ private final TFragmentInstanceId remoteFragmentInstanceId;
+ private final String remotePlanNodeId;
+
+ /**
+ * @param remoteEndpoint Hostname and Port of the remote fragment instance where the data blocks
+ * should be sent to.
+ * @param remoteFragmentInstanceId The ID of the remote fragment instance.
+ * @param remotePlanNodeId The plan node ID of the remote exchangeNode.
+ */
+ public DownStreamChannelLocation(
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId) {
+ this.remoteEndpoint = remoteEndpoint;
+ this.remoteFragmentInstanceId = remoteFragmentInstanceId;
+ this.remotePlanNodeId = remotePlanNodeId;
+ }
+
+ public TEndPoint getRemoteEndpoint() {
+ return remoteEndpoint;
+ }
+
+ public TFragmentInstanceId getRemoteFragmentInstanceId() {
+ return remoteFragmentInstanceId;
+ }
+
+ public String getRemotePlanNodeId() {
+ return remotePlanNodeId;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(remoteEndpoint.getIp(), byteBuffer);
+ ReadWriteIOUtils.write(remoteEndpoint.getPort(), byteBuffer);
+ ReadWriteIOUtils.write(remoteFragmentInstanceId.getInstanceId(), byteBuffer);
+ ReadWriteIOUtils.write(remoteFragmentInstanceId.getFragmentId(), byteBuffer);
+ ReadWriteIOUtils.write(remoteFragmentInstanceId.getQueryId(), byteBuffer);
+ ReadWriteIOUtils.write(remotePlanNodeId, byteBuffer);
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(remoteEndpoint.getIp(), stream);
+ ReadWriteIOUtils.write(remoteEndpoint.getPort(), stream);
+ ReadWriteIOUtils.write(remoteFragmentInstanceId.getInstanceId(), stream);
+ ReadWriteIOUtils.write(remoteFragmentInstanceId.getFragmentId(), stream);
+ ReadWriteIOUtils.write(remoteFragmentInstanceId.getQueryId(), stream);
+ ReadWriteIOUtils.write(remotePlanNodeId, stream);
+ }
+
+ public static DownStreamChannelLocation deserialize(ByteBuffer byteBuffer) {
+ TEndPoint endPoint =
+ new TEndPoint(
+ ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
+ TFragmentInstanceId fragmentInstanceId =
+ new TFragmentInstanceId(
+ ReadWriteIOUtils.readString(byteBuffer),
+ ReadWriteIOUtils.readInt(byteBuffer),
+ ReadWriteIOUtils.readString(byteBuffer));
+ String remotePlanNodeId = ReadWriteIOUtils.readString(byteBuffer);
+ return new DownStreamChannelLocation(endPoint, fragmentInstanceId, remotePlanNodeId);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkChannel.java
new file mode 100644
index 0000000000..2072136cf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkChannel.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+public interface ISinkChannel {
+
+ void open();
+
+ boolean isNoMoreTsBlocks();
+
+ long getRetainedSizeInBytes();
+
+ int getNumOfBufferedTsBlocks();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
similarity index 66%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
index 4bceda0e15..1a3fc2ae52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
@@ -16,57 +16,67 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.List;
-
public interface ISinkHandle {
/** Get the local fragment instance ID that this sink handle belongs to. */
TFragmentInstanceId getLocalFragmentInstanceId();
- /** Get the total amount of memory used by buffered tsblocks. */
+ /** Get the total amount of memory used by buffered TsBlocks. */
long getBufferRetainedSizeInBytes();
+ default ISinkHandle getChannel(int index) {
+ throw new UnsupportedOperationException();
+ }
+
/** Get a future that will be completed when the output buffer is not full. */
ListenableFuture<?> isFull();
/**
- * Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set,
+ * Send a {@link TsBlock} to an un-partitioned output buffer. If no-more-TsBlocks has been set,
* the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
* will be thrown if any exception happened during the data transmission.
*/
void send(TsBlock tsBlock);
/**
- * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
- * tsblock call is ignored. This can happen with limit queries. A {@link RuntimeException} will be
- * thrown if any exception happened * during the data transmission.
+ * Notify the handle that there are no more TsBlocks. Any future calls to send a TsBlock should be
+ * ignored.
*/
- void send(int partition, List<TsBlock> tsBlocks);
+ void setNoMoreTsBlocks();
/**
- * Notify the handle that there are no more tsblocks. Any future calls to send a tsblock should be
- * ignored.
+ * Notify the handle that there are no more TsBlocks for the specified channel. Any future calls
+ * to send a TsBlock to the specified channel should be ignored.
+ *
+ * @param channelIndex index of the channel that should be closed
*/
- void setNoMoreTsBlocks();
+ default void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** Open specified channel of ISinkHandle. */
+ default void tryOpenChannel(int channelIndex) {
+ throw new UnsupportedOperationException();
+ }
/** If the handle is aborted. */
boolean isAborted();
/**
- * If there are no more tsblocks to be sent and all the tsblocks have been fetched by downstream
+ * If there are no more TsBlocks to be sent and all the TsBlocks have been fetched by downstream
* fragment instances.
*/
boolean isFinished();
/**
- * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+ * Abort the sink handle. Discard all TsBlocks which may still be in the memory buffer and cancel
* the future returned by {@link #isFull()}.
*
* <p>Should only be called in abnormal case
@@ -74,7 +84,7 @@ public interface ISinkHandle {
void abort();
/**
- * Close the sink handle. Discard all tsblocks which may still be in the memory buffer and
+ * Close the sink handle. Discard all TsBlocks which may still be in the memory buffer and
* complete the future returned by {@link #isFull()}.
*
* <p>Should only be called in normal case.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkHandle.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkHandle.java
index 8afb07fe6e..48b617a1dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkHandle.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -29,15 +30,14 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.Optional;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
-public class LocalSinkHandle implements ISinkHandle {
+public class LocalSinkHandle implements ISinkHandle, ISinkChannel {
- private static final Logger logger = LoggerFactory.getLogger(LocalSinkHandle.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalSinkHandle.class);
private TFragmentInstanceId localFragmentInstanceId;
private final SinkHandleListener sinkHandleListener;
@@ -47,6 +47,8 @@ public class LocalSinkHandle implements ISinkHandle {
private boolean aborted = false;
private boolean closed = false;
+ private boolean noMoreTsBlocks = false;
+
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
public LocalSinkHandle(SharedTsBlockQueue queue, SinkHandleListener sinkHandleListener) {
@@ -122,7 +124,7 @@ public class LocalSinkHandle implements ISinkHandle {
if (queue.hasNoMoreTsBlocks()) {
return;
}
- logger.debug("[StartSendTsBlockOnLocal]");
+ LOGGER.debug("[StartSendTsBlockOnLocal]");
synchronized (this) {
blocked = queue.add(tsBlock);
}
@@ -133,16 +135,12 @@ public class LocalSinkHandle implements ISinkHandle {
}
}
- @Override
- public synchronized void send(int partition, List<TsBlock> tsBlocks) {
- throw new UnsupportedOperationException();
- }
-
@Override
public void setNoMoreTsBlocks() {
synchronized (queue) {
synchronized (this) {
- logger.debug("[StartSetNoMoreTsBlocksOnLocal]");
+ LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
+ noMoreTsBlocks = true;
if (aborted || closed) {
return;
}
@@ -151,12 +149,12 @@ public class LocalSinkHandle implements ISinkHandle {
}
}
checkAndInvokeOnFinished();
- logger.debug("[EndSetNoMoreTsBlocksOnLocal]");
+ LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
}
@Override
public void abort() {
- logger.debug("[StartAbortLocalSinkHandle]");
+ LOGGER.debug("[StartAbortLocalSinkHandle]");
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
@@ -171,12 +169,12 @@ public class LocalSinkHandle implements ISinkHandle {
}
}
}
- logger.debug("[EndAbortLocalSinkHandle]");
+ LOGGER.debug("[EndAbortLocalSinkHandle]");
}
@Override
public void close() {
- logger.debug("[StartCloseLocalSinkHandle]");
+ LOGGER.debug("[StartCloseLocalSinkHandle]");
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
@@ -187,7 +185,7 @@ public class LocalSinkHandle implements ISinkHandle {
sinkHandleListener.onFinish(this);
}
}
- logger.debug("[EndCloseLocalSinkHandle]");
+ LOGGER.debug("[EndCloseLocalSinkHandle]");
}
public SharedTsBlockQueue getSharedTsBlockQueue() {
@@ -207,4 +205,26 @@ public class LocalSinkHandle implements ISinkHandle {
// do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
// LocalSourceHandle
}
+
+ // region ============ ISinkChannel related ============
+
+ @Override
+ public void open() {}
+
+ @Override
+ public boolean isNoMoreTsBlocks() {
+ return noMoreTsBlocks;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return queue.getBufferRetainedSizeInBytes();
+ }
+
+ @Override
+ public int getNumOfBufferedTsBlocks() {
+ return queue.getNumOfBufferedTsBlocks();
+ }
+
+ // end region
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
new file mode 100644
index 0000000000..4a4b7cbde4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
+
+public class ShuffleSinkHandle implements ISinkHandle {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleSinkHandle.class);
+
+ /** Each ISinkHandle in the list matches one downStream ISourceHandle */
+ private final List<ISinkHandle> downStreamChannelList;
+
+ private final boolean[] hasSetNoMoreTsBlocks;
+
+ private final boolean[] channelOpened;
+
+ private final DownStreamChannelIndex downStreamChannelIndex;
+
+ private final int channelNum;
+
+ private final ShuffleStrategy shuffleStrategy;
+
+ private final String localPlanNodeId;
+
+ private final TFragmentInstanceId localFragmentInstanceId;
+
+ private final MPPDataExchangeManager.SinkHandleListener sinkHandleListener;
+
+ private boolean aborted = false;
+
+ private boolean closed = false;
+
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
+ /** max bytes this ShuffleSinkHandle can reserve. */
+ private long maxBytesCanReserve =
+ IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+
+ public ShuffleSinkHandle(
+ TFragmentInstanceId localFragmentInstanceId,
+ List<ISinkHandle> downStreamChannelList,
+ DownStreamChannelIndex downStreamChannelIndex,
+ ShuffleStrategyEnum shuffleStrategyEnum,
+ String localPlanNodeId,
+ MPPDataExchangeManager.SinkHandleListener sinkHandleListener) {
+ this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.downStreamChannelList = Validate.notNull(downStreamChannelList);
+ this.downStreamChannelIndex = Validate.notNull(downStreamChannelIndex);
+ this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
+ this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+ this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+ this.channelNum = downStreamChannelList.size();
+ this.hasSetNoMoreTsBlocks = new boolean[channelNum];
+ this.channelOpened = new boolean[channelNum];
+ // open first channel
+ tryOpenChannel(0);
+ }
+
+ @Override
+ public TFragmentInstanceId getLocalFragmentInstanceId() {
+ return localFragmentInstanceId;
+ }
+
+ public ISinkHandle getChannel(int index) {
+ return downStreamChannelList.get(index);
+ }
+
+ @Override
+ public synchronized ListenableFuture<?> isFull() {
+ // It is safe to use currentSinkHandle.isFull() to judge whether we can send a TsBlock only when
+ // downStreamChannelIndex will not be changed between we call isFull() and send() of
+ // ShuffleSinkHandle
+ ISinkHandle currentSinkHandle =
+ downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
+ return currentSinkHandle.isFull();
+ }
+
+ @Override
+ public synchronized void send(TsBlock tsBlock) {
+ long startTime = System.nanoTime();
+ try {
+ ISinkHandle currentSinkHandle =
+ downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
+ checkState();
+ currentSinkHandle.send(tsBlock);
+ } finally {
+ switchChannelIfNecessary();
+ QUERY_METRICS.recordDataExchangeCost(
+ SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
+ }
+ }
+
+ @Override
+ public synchronized void setNoMoreTsBlocks() {
+ for (int i = 0; i < downStreamChannelList.size(); i++) {
+ if (!hasSetNoMoreTsBlocks[i]) {
+ downStreamChannelList.get(i).setNoMoreTsBlocks();
+ hasSetNoMoreTsBlocks[i] = true;
+ }
+ }
+ sinkHandleListener.onEndOfBlocks(this);
+ }
+
+ @Override
+ public synchronized void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
+ if (!hasSetNoMoreTsBlocks[channelIndex]) {
+ downStreamChannelList.get(channelIndex).setNoMoreTsBlocks();
+ hasSetNoMoreTsBlocks[channelIndex] = true;
+ }
+ }
+
+ @Override
+ public synchronized boolean isAborted() {
+ return aborted;
+ }
+
+ @Override
+ public synchronized boolean isFinished() {
+ for (ISinkHandle channel : downStreamChannelList) {
+ if (!channel.isFinished()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void abort() {
+ if (aborted) {
+ return;
+ }
+ LOGGER.debug("[StartAbortShuffleSinkHandle]");
+ for (ISinkHandle channel : downStreamChannelList) {
+ try {
+ channel.abort();
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred when try to abort channel.");
+ }
+ }
+ aborted = true;
+ sinkHandleListener.onAborted(this);
+ LOGGER.debug("[EndAbortShuffleSinkHandle]");
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ LOGGER.debug("[StartCloseShuffleSinkHandle]");
+ for (ISinkHandle channel : downStreamChannelList) {
+ try {
+ channel.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred when try to abort channel.");
+ }
+ }
+ closed = true;
+ sinkHandleListener.onFinish(this);
+ LOGGER.debug("[EndCloseShuffleSinkHandle]");
+ }
+
+ @Override
+ public void setMaxBytesCanReserve(long maxBytesCanReserve) {
+ this.maxBytesCanReserve = maxBytesCanReserve;
+ downStreamChannelList.forEach(
+ sinkHandle -> sinkHandle.setMaxBytesCanReserve(maxBytesCanReserve));
+ }
+
+ private void checkState() {
+ if (aborted) {
+ throw new IllegalStateException("ShuffleSinkHandle is aborted.");
+ } else if (closed) {
+ throw new IllegalStateException("ShuffleSinkHandle is closed.");
+ }
+ }
+
+ private void switchChannelIfNecessary() {
+ shuffleStrategy.shuffle();
+ tryOpenChannel(downStreamChannelIndex.getCurrentIndex());
+ }
+
+ public void tryOpenChannel(int channelIndex) {
+ if (!channelOpened[channelIndex]) {
+ ((ISinkChannel) downStreamChannelList.get(channelIndex)).open();
+ channelOpened[channelIndex] = true;
+ }
+ }
+
+ // region ============ Shuffle Related ============
+ public enum ShuffleStrategyEnum {
+ PLAIN,
+ SIMPLE_ROUND_ROBIN,
+ }
+
+ @FunctionalInterface
+ interface ShuffleStrategy {
+ /*
+ SinkHandle may have multiple channels, we need to choose the next channel each time we send a TsBlock.
+ */
+ void shuffle();
+ }
+
+ class PlainShuffleStrategy implements ShuffleStrategy {
+
+ @Override
+ public void shuffle() {
+ // do nothing
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "PlainShuffleStrategy needs to do nothing, current channel index is {}",
+ downStreamChannelIndex.getCurrentIndex());
+ }
+ }
+ }
+
+ class SimpleRoundRobinStrategy implements ShuffleStrategy {
+
+ private final long channelMemoryThreshold = maxBytesCanReserve / channelNum * 3;
+
+ @Override
+ public void shuffle() {
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ for (int i = 1; i < channelNum; i++) {
+ int nextIndex = (currentIndex + i) % channelNum;
+ if (satisfy(nextIndex)) {
+ downStreamChannelIndex.setCurrentIndex(nextIndex);
+ return;
+ }
+ }
+ }
+
+ private boolean satisfy(int channelIndex) {
+ // downStreamChannel is always an ISinkChannel
+ ISinkChannel channel = (ISinkChannel) downStreamChannelList.get(channelIndex);
+ if (channel.isNoMoreTsBlocks()) {
+ return false;
+ }
+ return channel.getRetainedSizeInBytes() <= channelMemoryThreshold
+ && channel.getNumOfBufferedTsBlocks() < 3;
+ }
+ }
+
+ private ShuffleStrategy getShuffleStrategy(ShuffleStrategyEnum strategyEnum) {
+ switch (strategyEnum) {
+ case PLAIN:
+ return new PlainShuffleStrategy();
+ case SIMPLE_ROUND_ROBIN:
+ return new SimpleRoundRobinStrategy();
+ default:
+ throw new UnsupportedOperationException("Unsupported type of shuffle strategy");
+ }
+ }
+
+ // endregion
+
+ // region ============= Test Only =============
+ @TestOnly
+ @Override
+ public long getBufferRetainedSizeInBytes() {
+ return downStreamChannelList.stream()
+ .map(ISinkHandle::getBufferRetainedSizeInBytes)
+ .reduce(Long::sum)
+ .orElse(0L);
+ }
+ // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkHandle.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkHandle.java
index fbc36ed34d..ff3ca62043 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkHandle.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.sink;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
@@ -57,9 +57,9 @@ import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HAND
import static org.apache.iotdb.db.mpp.metric.DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_CALLER;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
-public class SinkHandle implements ISinkHandle {
+public class SinkHandle implements ISinkHandle, ISinkChannel {
- private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SinkHandle.class);
public static final int MAX_ATTEMPT_TIMES = 3;
private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000L;
@@ -80,7 +80,7 @@ public class SinkHandle implements ISinkHandle {
private long retryIntervalInMs;
// Use LinkedHashMap to meet 2 needs,
- // 1. Predictable iteration order so that removing buffered tsblocks can be efficient.
+ // 1. Predictable iteration order so that removing buffered TsBlocks can be efficient.
// 2. Fast lookup.
private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock =
new LinkedHashMap<>();
@@ -102,7 +102,7 @@ public class SinkHandle implements ISinkHandle {
private boolean noMoreTsBlocks = false;
- /** max bytes this SourceHandle can reserve. */
+ /** max bytes this SinkHandle can reserve. */
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
@@ -138,20 +138,11 @@ public class SinkHandle implements ISinkHandle {
localFragmentInstanceId.queryId,
localFragmentInstanceId.fragmentId,
localFragmentInstanceId.instanceId);
- this.blocked =
- localMemoryManager
- .getQueryPool()
- .reserve(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
- DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
- // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
- // at first this SinkHandle has not reserved memory.
- .left;
this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+ // delete this once ShuffleSinkHandle is finished
+ open();
}
@Override
@@ -203,14 +194,9 @@ public class SinkHandle implements ISinkHandle {
}
}
- @Override
- public synchronized void send(int partition, List<TsBlock> tsBlocks) {
- throw new UnsupportedOperationException();
- }
-
@Override
public synchronized void setNoMoreTsBlocks() {
- logger.debug("[StartSetNoMoreTsBlocks]");
+ LOGGER.debug("[StartSetNoMoreTsBlocks]");
if (aborted || closed) {
return;
}
@@ -219,7 +205,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public synchronized void abort() {
- logger.debug("[StartAbortSinkHandle]");
+ LOGGER.debug("[StartAbortSinkHandle]");
sequenceIdToTsBlock.clear();
aborted = true;
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
@@ -238,12 +224,12 @@ public class SinkHandle implements ISinkHandle {
.clearMemoryReservationMap(
localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
sinkHandleListener.onAborted(this);
- logger.debug("[EndAbortSinkHandle]");
+ LOGGER.debug("[EndAbortSinkHandle]");
}
@Override
public synchronized void close() {
- logger.debug("[StartCloseSinkHandle]");
+ LOGGER.debug("[StartCloseSinkHandle]");
sequenceIdToTsBlock.clear();
closed = true;
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
@@ -262,7 +248,7 @@ public class SinkHandle implements ISinkHandle {
.clearMemoryReservationMap(
localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
sinkHandleListener.onFinish(this);
- logger.debug("[EndCloseSinkHandle]");
+ LOGGER.debug("[EndCloseSinkHandle]");
}
@Override
@@ -280,17 +266,13 @@ public class SinkHandle implements ISinkHandle {
return bufferRetainedSizeInBytes;
}
- public int getNumOfBufferedTsBlocks() {
- return sequenceIdToTsBlock.size();
- }
-
- ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
+ public ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
throw new UnsupportedOperationException();
}
- synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
+ public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
if (aborted || closed) {
- logger.warn(
+ LOGGER.warn(
"SinkHandle still receive getting TsBlock request after being aborted={} or closed={}",
aborted,
closed);
@@ -298,7 +280,7 @@ public class SinkHandle implements ISinkHandle {
}
Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
if (pair == null || pair.left == null) {
- logger.error(
+ LOGGER.warn(
"The TsBlock doesn't exist. Sequence ID is {}, remaining map is {}",
sequenceId,
sequenceIdToTsBlock.entrySet());
@@ -307,7 +289,7 @@ public class SinkHandle implements ISinkHandle {
return serde.serialize(pair.left);
}
- void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
+ public void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
long freedBytes = 0L;
synchronized (this) {
if (aborted || closed) {
@@ -327,7 +309,7 @@ public class SinkHandle implements ISinkHandle {
freedBytes += entry.getValue().right;
bufferRetainedSizeInBytes -= entry.getValue().right;
iterator.remove();
- logger.debug("[ACKTsBlock] {}.", entry.getKey());
+ LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
}
}
if (isFinished()) {
@@ -346,18 +328,7 @@ public class SinkHandle implements ISinkHandle {
}
}
- public TEndPoint getRemoteEndpoint() {
- return remoteEndpoint;
- }
-
- public TFragmentInstanceId getRemoteFragmentInstanceId() {
- return remoteFragmentInstanceId;
- }
-
- public String getRemotePlanNodeId() {
- return remotePlanNodeId;
- }
-
+ @Override
public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
@@ -384,11 +355,49 @@ public class SinkHandle implements ISinkHandle {
}
}
+ // region ============ ISinkChannel related ============
+
+ public void open() {
+ // SinkHandle is opened when ShuffleSinkHandle choose it as the next channel
+ this.blocked =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
+ // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
+ // at first this SinkHandle has not reserved memory.
+ .left;
+ }
+
+ @Override
+ public boolean isNoMoreTsBlocks() {
+ return noMoreTsBlocks;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes() {
+ return bufferRetainedSizeInBytes;
+ }
+
+ @Override
+ public int getNumOfBufferedTsBlocks() {
+ return sequenceIdToTsBlock.size();
+ }
+
+ // endregion
+
+ // region ============ TestOnly ============
@TestOnly
public void setRetryIntervalInMs(long retryIntervalInMs) {
this.retryIntervalInMs = retryIntervalInMs;
}
+ // endregion
+ // region ============ inner class ============
/**
* Send a {@link org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent} to downstream fragment
* instance.
@@ -411,7 +420,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public void run() {
try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
- logger.debug(
+ LOGGER.debug(
"[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size());
int attempt = 0;
TNewDataBlockEvent newDataBlockEvent =
@@ -429,7 +438,7 @@ public class SinkHandle implements ISinkHandle {
client.onNewDataBlockEvent(newDataBlockEvent);
break;
} catch (Exception e) {
- logger.warn("Failed to send new data block event, attempt times: {}", attempt, e);
+ LOGGER.warn("Failed to send new data block event, attempt times: {}", attempt, e);
if (attempt == MAX_ATTEMPT_TIMES) {
sinkHandleListener.onFailure(SinkHandle.this, e);
}
@@ -458,7 +467,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public void run() {
try (SetThreadName sinkHandleName = new SetThreadName(threadName)) {
- logger.debug("[NotifyNoMoreTsBlock]");
+ LOGGER.debug("[NotifyNoMoreTsBlock]");
int attempt = 0;
TEndOfDataBlockEvent endOfDataBlockEvent =
new TEndOfDataBlockEvent(
@@ -473,9 +482,9 @@ public class SinkHandle implements ISinkHandle {
client.onEndOfDataBlockEvent(endOfDataBlockEvent);
break;
} catch (Exception e) {
- logger.warn("Failed to send end of data block event, attempt times: {}", attempt, e);
+ LOGGER.warn("Failed to send end of data block event, attempt times: {}", attempt, e);
if (attempt == MAX_ATTEMPT_TIMES) {
- logger.warn("Failed to send end of data block event after all retry", e);
+ LOGGER.warn("Failed to send end of data block event after all retry", e);
sinkHandleListener.onFailure(SinkHandle.this, e);
return;
}
@@ -495,4 +504,6 @@ public class SinkHandle implements ISinkHandle {
}
}
}
+ // endregion
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
index d056717060..14ac3429e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.source;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index f47dea04d6..fd639ce0e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.source;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 386fdd40b8..d03da2b10a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.exchange;
+package org.apache.iotdb.db.mpp.execution.exchange.source;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
@@ -72,6 +72,8 @@ public class SourceHandle implements ISourceHandle {
private final String fullFragmentInstanceId;
private final String localPlanNodeId;
+
+ private final int indexOfUpstreamSinkHandle;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
private final TsBlockSerde serde;
@@ -117,6 +119,7 @@ public class SourceHandle implements ISourceHandle {
TFragmentInstanceId remoteFragmentInstanceId,
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
TsBlockSerde serde,
@@ -129,6 +132,7 @@ public class SourceHandle implements ISourceHandle {
this.fullFragmentInstanceId =
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+ this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
this.serde = Validate.notNull(serde);
@@ -278,7 +282,7 @@ public class SourceHandle implements ISourceHandle {
return nonCancellationPropagating(blocked);
}
- synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+ public synchronized void setNoMoreTsBlocks(int lastSequenceId) {
logger.debug("[ReceiveNoMoreTsBlockEvent]");
this.lastSequenceId = lastSequenceId;
if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
@@ -289,7 +293,8 @@ public class SourceHandle implements ISourceHandle {
}
}
- synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
+ public synchronized void updatePendingDataBlockInfo(
+ int startSequenceId, List<Long> dataBlockSizes) {
logger.debug(
"[ReceiveNewTsBlockNotification] [{}, {}), each size is: {}",
startSequenceId,
@@ -467,7 +472,11 @@ public class SourceHandle implements ISourceHandle {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
logger.debug("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId);
TGetDataBlockRequest req =
- new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
+ new TGetDataBlockRequest(
+ remoteFragmentInstanceId,
+ startSequenceId,
+ endSequenceId,
+ indexOfUpstreamSinkHandle);
int attempt = 0;
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
@@ -565,7 +574,10 @@ public class SourceHandle implements ISourceHandle {
int attempt = 0;
TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent =
new TAcknowledgeDataBlockEvent(
- remoteFragmentInstanceId, startSequenceId, endSequenceId);
+ remoteFragmentInstanceId,
+ startSequenceId,
+ endSequenceId,
+ indexOfUpstreamSinkHandle);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
long startTime = System.nanoTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 14ada6d7c0..8c76bf989b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.utils.SetThreadName;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 794174b939..1e4f529a80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
new file mode 100644
index 0000000000..01ecb7f71b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class IdentitySinkOperator implements Operator {
+
+ private final OperatorContext operatorContext;
+ private final List<Operator> children;
+
+ private final DownStreamChannelIndex downStreamChannelIndex;
+
+ private final ISinkHandle sinkHandle;
+
+ private boolean needToReturnNull = false;
+
+ private boolean isFinished = false;
+
+ public IdentitySinkOperator(
+ OperatorContext operatorContext,
+ List<Operator> children,
+ DownStreamChannelIndex downStreamChannelIndex,
+ ISinkHandle sinkHandle) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.downStreamChannelIndex = downStreamChannelIndex;
+ this.sinkHandle = sinkHandle;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (children.get(downStreamChannelIndex.getCurrentIndex()).hasNext()) {
+ return true;
+ }
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ // current channel have no more data
+ sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
+ currentIndex++;
+ if (currentIndex >= children.size()) {
+ isFinished = true;
+ return false;
+ }
+ downStreamChannelIndex.setCurrentIndex(currentIndex);
+ // if we reach here, it means that isBlocked() is called on a different child
+ // we need to ensure that this child is not blocked. We set this field to true here so that we
+ // can begin another loop in Driver.
+ needToReturnNull = true;
+ // tryOpenChannel first
+ sinkHandle.tryOpenChannel(currentIndex);
+ return true;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (needToReturnNull) {
+ needToReturnNull = false;
+ return null;
+ }
+ return children.get(downStreamChannelIndex.getCurrentIndex()).next();
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return children.get(downStreamChannelIndex.getCurrentIndex()).isBlocked();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnSize = 0;
+ for (Operator child : children) {
+ maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+ }
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
new file mode 100644
index 0000000000..7e643ca576
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ShuffleHelperOperator implements Operator {
+ private final OperatorContext operatorContext;
+ private final List<Operator> children;
+
+ private final DownStreamChannelIndex downStreamChannelIndex;
+
+ private final ISinkHandle sinkHandle;
+
+ private final Set<Integer> unfinishedChildren;
+
+ private boolean needToReturnNull = false;
+
+ public ShuffleHelperOperator(
+ OperatorContext operatorContext,
+ List<Operator> children,
+ DownStreamChannelIndex downStreamChannelIndex,
+ ISinkHandle sinkHandle) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.downStreamChannelIndex = downStreamChannelIndex;
+ this.sinkHandle = sinkHandle;
+ this.unfinishedChildren = new HashSet<>(children.size());
+ for (int i = 0; i < children.size(); i++) {
+ unfinishedChildren.add(i);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (children.get(downStreamChannelIndex.getCurrentIndex()).hasNext()) {
+ return true;
+ }
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ // current channel have no more data
+ sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
+ unfinishedChildren.remove(currentIndex);
+ currentIndex = (currentIndex + 1) % children.size();
+ downStreamChannelIndex.setCurrentIndex(currentIndex);
+ // if we reach here, it means that isBlocked() is called on a different child
+ // we need to ensure that this child is not blocked. We set this field to true here so that we
+ // can begin another loop in Driver.
+ needToReturnNull = true;
+ // tryOpenChannel first
+ sinkHandle.tryOpenChannel(currentIndex);
+ return true;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (needToReturnNull) {
+ needToReturnNull = false;
+ return null;
+ }
+ return children.get(downStreamChannelIndex.getCurrentIndex()).next();
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return children.get(downStreamChannelIndex.getCurrentIndex()).isBlocked();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return unfinishedChildren.isEmpty();
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnSize = 0;
+ for (Operator child : children) {
+ maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+ }
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 7e7498fbc1..ba57e1ff07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.source;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index c436b180f8..86ad87c780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
import org.apache.iotdb.db.mpp.execution.schedule.ExecutionContext;
import org.apache.iotdb.db.mpp.execution.schedule.queue.ID;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index ae8722f1d7..50278149c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.QueryState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.mpp.metric.PerformanceOverviewMetricsManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -563,12 +563,14 @@ public class QueryExecution implements IQueryExecution {
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+ 0, // Upstream of result ExchangeNode will only have one child.
stateMachine::transitionToFailed)
: MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createSourceHandle(
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+ 0,
upstreamEndPoint,
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
stateMachine::transitionToFailed);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index e2b60ef441..a071b4b83c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.mpp.plan.execution.memory;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 0404be866a..f6e808e2f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 49706b4ebe..74d96ccc9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -33,11 +33,13 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.NodeRef;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
-import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
-import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
-import org.apache.iotdb.db.mpp.execution.exchange.LocalSinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelIndex;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -112,6 +114,8 @@ import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperato
import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.source.SchemaSourceFactory;
+import org.apache.iotdb.db.mpp.execution.operator.sink.IdentitySinkOperator;
+import org.apache.iotdb.db.mpp.execution.operator.sink.ShuffleHelperOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
@@ -171,6 +175,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryColl
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -1806,10 +1812,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
remoteInstanceId.toThrift(),
+ node.getIndexOfUpstreamSinkHandle(),
context.getInstanceContext()::failed)
: MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
+ node.getIndexOfUpstreamSinkHandle(),
upstreamEndPoint,
remoteInstanceId.toThrift(),
context.getInstanceContext()::failed);
@@ -1854,6 +1862,74 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return child;
}
+ @Override
+ public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanContext context) {
+ context.addExchangeSumNum(1);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ IdentitySinkOperator.class.getSimpleName());
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(Collectors.toList());
+
+ checkArgument(
+ MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
+ FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+ DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0);
+ ISinkHandle sinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
+ node.getDownStreamChannelLocationList(),
+ downStreamChannelIndex,
+ ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN,
+ localInstanceId.toThrift(),
+ node.getPlanNodeId().getId(),
+ context.getInstanceContext());
+ sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+ context.getDriverContext().setSinkHandle(sinkHandle);
+
+ return new IdentitySinkOperator(operatorContext, children, downStreamChannelIndex, sinkHandle);
+ }
+
+ @Override
+ public Operator visitShuffleSink(ShuffleSinkNode node, LocalExecutionPlanContext context) {
+ context.addExchangeSumNum(1);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ ShuffleHelperOperator.class.getSimpleName());
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(Collectors.toList());
+
+ checkArgument(
+ MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
+ FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+ DownStreamChannelIndex downStreamChannelIndex = new DownStreamChannelIndex(0);
+ ISinkHandle sinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createShuffleSinkHandle(
+ node.getDownStreamChannelLocationList(),
+ downStreamChannelIndex,
+ ShuffleSinkHandle.ShuffleStrategyEnum.SIMPLE_ROUND_ROBIN,
+ localInstanceId.toThrift(),
+ node.getPlanNodeId().getId(),
+ context.getInstanceContext());
+ sinkHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+ context.getDriverContext().setSinkHandle(sinkHandle);
+
+ return new ShuffleHelperOperator(operatorContext, children, downStreamChannelIndex, sinkHandle);
+ }
+
@Override
public Operator visitSchemaFetchMerge(
SchemaFetchMergeNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 2527f5a74f..7b598ef8c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -73,6 +73,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryColl
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -162,7 +164,9 @@ public enum PlanNodeType {
MERGE_SORT((short) 66),
SHOW_QUERIES((short) 67),
INTERNAL_BATCH_ACTIVATE_TEMPLATE((short) 68),
- INTERNAL_CREATE_MULTI_TIMESERIES((short) 69);
+ INTERNAL_CREATE_MULTI_TIMESERIES((short) 69),
+ IDENTITY_SINK((short) 70),
+ SHUFFLE_SINK((short) 71);
public static final int BYTES = Short.BYTES;
@@ -351,6 +355,10 @@ public enum PlanNodeType {
return InternalBatchActivateTemplateNode.deserialize(buffer);
case 69:
return InternalCreateMultiTimeSeriesNode.deserialize(buffer);
+ case 70:
+ return IdentitySinkNode.deserialize(buffer);
+ case 71:
+ return ShuffleSinkNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8181efcfa3..2e87d388c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -71,6 +71,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryColl
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -356,4 +358,12 @@ public abstract class PlanVisitor<R, C> {
public R visitInternalCreateMultiTimeSeries(InternalCreateMultiTimeSeriesNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitIdentitySink(IdentitySinkNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitShuffleSink(ShuffleSinkNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 06310be926..7e35ec85da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -50,6 +50,9 @@ public class ExchangeNode extends SingleChildProcessNode {
private List<String> outputColumnNames;
+ /** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode it matches */
+ private int indexOfUpstreamSinkHandle = 0;
+
public ExchangeNode(PlanNodeId id) {
super(id);
}
@@ -102,10 +105,12 @@ public class ExchangeNode extends SingleChildProcessNode {
outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
outputColumnNamesSize--;
}
+ int index = ReadWriteIOUtils.readInt(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
exchangeNode.setOutputColumnNames(outputColumnNames);
+ exchangeNode.setIndexOfUpstreamSinkHandle(index);
return exchangeNode;
}
@@ -120,6 +125,7 @@ public class ExchangeNode extends SingleChildProcessNode {
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, byteBuffer);
}
+ ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, byteBuffer);
}
@Override
@@ -133,6 +139,7 @@ public class ExchangeNode extends SingleChildProcessNode {
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, stream);
}
+ ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, stream);
}
@Override
@@ -154,6 +161,14 @@ public class ExchangeNode extends SingleChildProcessNode {
return remoteSourceNode;
}
+ public int getIndexOfUpstreamSinkHandle() {
+ return indexOfUpstreamSinkHandle;
+ }
+
+ public void setIndexOfUpstreamSinkHandle(int indexOfUpstreamSinkHandle) {
+ this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
+ }
+
public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
this.remoteSourceNode = remoteSourceNode;
this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
new file mode 100644
index 0000000000..a4e549245c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/IdentitySinkNode.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class IdentitySinkNode extends MultiChildrenSinkNode {
+
+ public IdentitySinkNode(
+ PlanNodeId id, List<DownStreamChannelLocation> downStreamChannelLocationList) {
+ super(id, downStreamChannelLocationList);
+ }
+
+ public IdentitySinkNode(
+ PlanNodeId id,
+ List<PlanNode> children,
+ List<DownStreamChannelLocation> downStreamChannelLocationList) {
+ super(id, children, downStreamChannelLocationList);
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new IdentitySinkNode(getPlanNodeId(), getDownStreamChannelLocationList());
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.IDENTITY_SINK.serialize(byteBuffer);
+ ReadWriteIOUtils.write(downStreamChannelLocationList.size(), byteBuffer);
+ for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+ downStreamChannelLocation.serialize(byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.IDENTITY_SINK.serialize(stream);
+ ReadWriteIOUtils.write(downStreamChannelLocationList.size(), stream);
+ for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+ downStreamChannelLocation.serialize(stream);
+ }
+ }
+
+ public static IdentitySinkNode deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<DownStreamChannelLocation> downStreamChannelLocationList = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ downStreamChannelLocationList.add(DownStreamChannelLocation.deserialize(byteBuffer));
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new IdentitySinkNode(planNodeId, downStreamChannelLocationList);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
new file mode 100644
index 0000000000..f1fbd92e3f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public abstract class MultiChildrenSinkNode extends SinkNode {
+
+ protected List<PlanNode> children;
+
+ protected final List<DownStreamChannelLocation> downStreamChannelLocationList;
+
+ public MultiChildrenSinkNode(PlanNodeId id) {
+ super(id);
+ this.children = new ArrayList<>();
+ this.downStreamChannelLocationList = new ArrayList<>();
+ }
+
+ protected MultiChildrenSinkNode(
+ PlanNodeId id,
+ List<PlanNode> children,
+ List<DownStreamChannelLocation> downStreamChannelLocationList) {
+ super(id);
+ this.children = children;
+ this.downStreamChannelLocationList = downStreamChannelLocationList;
+ }
+
+ protected MultiChildrenSinkNode(
+ PlanNodeId id, List<DownStreamChannelLocation> downStreamChannelLocationList) {
+ super(id);
+ this.children = new ArrayList<>();
+ this.downStreamChannelLocationList = downStreamChannelLocationList;
+ }
+
+ public void setChildren(List<PlanNode> children) {
+ this.children = children;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ this.children.add(child);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return CHILD_COUNT_NO_LIMIT;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MultiChildrenSinkNode that = (MultiChildrenSinkNode) o;
+ return children.equals(that.children);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), children);
+ }
+
+ @Override
+ public void send() {}
+
+ @Override
+ public void close() throws Exception {}
+
+ public List<DownStreamChannelLocation> getDownStreamChannelLocationList() {
+ return downStreamChannelLocationList;
+ }
+
+ public void addDownStreamChannelLocation(DownStreamChannelLocation downStreamChannelLocation) {
+ downStreamChannelLocationList.add(downStreamChannelLocation);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/ShuffleSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/ShuffleSinkNode.java
new file mode 100644
index 0000000000..81937f8140
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/ShuffleSinkNode.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.sink;
+
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Responsible for creating ShuffleHelperOperator and corresponding SinkHandle */
+public class ShuffleSinkNode extends MultiChildrenSinkNode {
+
+ public ShuffleSinkNode(
+ PlanNodeId id, List<DownStreamChannelLocation> downStreamChannelLocationList) {
+ super(id, downStreamChannelLocationList);
+ }
+
+ public ShuffleSinkNode(
+ PlanNodeId id,
+ List<PlanNode> children,
+ List<DownStreamChannelLocation> downStreamChannelLocationList) {
+ super(id, children, downStreamChannelLocationList);
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new ShuffleSinkNode(getPlanNodeId(), getDownStreamChannelLocationList());
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return CHILD_COUNT_NO_LIMIT;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SHUFFLE_SINK.serialize(byteBuffer);
+ ReadWriteIOUtils.write(downStreamChannelLocationList.size(), byteBuffer);
+ for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+ downStreamChannelLocation.serialize(byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.SHUFFLE_SINK.serialize(stream);
+ ReadWriteIOUtils.write(downStreamChannelLocationList.size(), stream);
+ for (DownStreamChannelLocation downStreamChannelLocation : downStreamChannelLocationList) {
+ downStreamChannelLocation.serialize(stream);
+ }
+ }
+
+ public static ShuffleSinkNode deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<DownStreamChannelLocation> downStreamChannelLocationList = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ downStreamChannelLocationList.add(DownStreamChannelLocation.deserialize(byteBuffer));
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new ShuffleSinkNode(planNodeId, downStreamChannelLocationList);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 8bcb1b300b..1ac1c32e91 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index 674a80ce3e..aa15f19967 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index 786d5b1830..3112e64f4b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -68,7 +72,7 @@ public class MPPDataExchangeManagerTest {
ISourceHandle localSourceHandle =
mppDataExchangeManager.createLocalSourceHandleForFragment(
- remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, t -> {});
+ remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, 0, t -> {});
Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
@@ -101,7 +105,7 @@ public class MPPDataExchangeManagerTest {
ISourceHandle localSourceHandle =
mppDataExchangeManager.createLocalSourceHandleForFragment(
- localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId, t -> {});
+ localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId, 0, t -> {});
Assert.assertTrue(localSourceHandle instanceof LocalSourceHandle);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index d50e69dd2a..67209c760d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.SinkHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index d0b8f49f29..8d6e7cd7d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
@@ -113,6 +114,7 @@ public class SourceHandleTest {
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
+ 0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
@@ -228,6 +230,7 @@ public class SourceHandleTest {
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
+ 0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
@@ -388,6 +391,7 @@ public class SourceHandleTest {
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
+ 0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
@@ -560,6 +564,7 @@ public class SourceHandleTest {
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
+ 0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
@@ -647,6 +652,7 @@ public class SourceHandleTest {
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
+ 0,
mockLocalMemoryManager,
Executors.newSingleThreadExecutor(),
mockTsBlockSerde,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index 0e7b5ffb96..b3b6437769 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.exchange;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -63,11 +64,6 @@ public class StubSinkHandle implements ISinkHandle {
this.tsBlocks.add(tsBlock);
}
- @Override
- public void send(int partition, List<TsBlock> tsBlocks) {
- this.tsBlocks.addAll(tsBlocks);
- }
-
@Override
public void setNoMoreTsBlocks() {
if (closed) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
new file mode 100644
index 0000000000..b09f34ea5d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/IdentitySinkNodeSerdeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.node.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class IdentitySinkNodeSerdeTest {
+
+ @Test
+ public void testSerializeAndDeserialize() throws IllegalPathException {
+ DownStreamChannelLocation downStreamChannelLocation =
+ new DownStreamChannelLocation(
+ new TEndPoint("test", 1), new TFragmentInstanceId("test", 1, "test"), "test");
+ IdentitySinkNode identitySinkNode1 =
+ new IdentitySinkNode(
+ new PlanNodeId("testIdentitySinkNode"),
+ Collections.singletonList(downStreamChannelLocation));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ identitySinkNode1.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), identitySinkNode1);
+
+ IdentitySinkNode identitySinkNode2 =
+ new IdentitySinkNode(new PlanNodeId("testIdentitySinkNode"), Collections.emptyList());
+ ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
+ identitySinkNode2.serialize(byteBuffer2);
+ byteBuffer2.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer2), identitySinkNode2);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/ShuffleSinkNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/ShuffleSinkNodeSerdeTest.java
new file mode 100644
index 0000000000..c3c9c69358
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/sink/ShuffleSinkNodeSerdeTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.node.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShuffleSinkNodeSerdeTest {
+ @Test
+ public void testSerializeAndDeserialize() throws IllegalPathException {
+ DownStreamChannelLocation downStreamChannelLocation =
+ new DownStreamChannelLocation(
+ new TEndPoint("test", 1), new TFragmentInstanceId("test", 1, "test"), "test");
+ ShuffleSinkNode shuffleSinkNode1 =
+ new ShuffleSinkNode(
+ new PlanNodeId("testIdentitySinkNode"),
+ Collections.singletonList(downStreamChannelLocation));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ shuffleSinkNode1.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), shuffleSinkNode1);
+
+ ShuffleSinkNode shuffleSinkNode2 =
+ new ShuffleSinkNode(new PlanNodeId("testIdentitySinkNode"), Collections.emptyList());
+ ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
+ shuffleSinkNode2.serialize(byteBuffer2);
+ byteBuffer2.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer2), shuffleSinkNode2);
+ }
+}
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index cf3fab6143..23fe7db8b0 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -63,6 +63,8 @@ struct TGetDataBlockRequest {
1: required TFragmentInstanceId sourceFragmentInstanceId
2: required i32 startSequenceId
3: required i32 endSequenceId
+ // index of upstream SinkHandle
+ 4: required i32 index
}
struct TGetDataBlockResponse {
@@ -73,6 +75,8 @@ struct TAcknowledgeDataBlockEvent {
1: required TFragmentInstanceId sourceFragmentInstanceId
2: required i32 startSequenceId
3: required i32 endSequenceId
+ // index of upstream SinkHandle
+ 4: required i32 index
}
struct TNewDataBlockEvent {