You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:43:51 UTC
[iotdb] 03/03: add UT for DistributedPlan Write
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b8a351c26ad4f3793343c0f2f809e4898e40261d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 22:43:35 2022 +0800
add UT for DistributedPlan Write
---
.../iotdb/commons/partition/DataPartition.java | 11 ++++-
.../planner/plan/WriteFragmentParallelPlanner.java | 2 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 57 ++++++++++++++++++++--
3 files changed, 62 insertions(+), 8 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3f677ce2c9..f352d7cdeb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -68,7 +68,10 @@ public class DataPartition {
// A list of data region replica sets will store data in a same time partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
- return Collections.emptyList();
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+ // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot
+ return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> timePartitionSlotList.contains(entry.getKey())).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
}
public RegionReplicaSet getDataRegionReplicaSetForWriting(
@@ -76,7 +79,11 @@ public class DataPartition {
// A list of data region replica sets will store data in a same time partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
- return null;
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+ List<RegionReplicaSet> regions = dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> entry.getKey().equals(timePartitionSlot)).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
+ // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot
+ return regions.get(0);
}
private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
index d8c9435214..a31ac6f25f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -62,7 +62,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType());
- instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
+ instance.setDataRegionAndHost(split.getRegionReplicaSet());
ret.add(instance);
}
return ret;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 39d9c6a71d..b4b77956f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -257,7 +258,7 @@ public class DistributionPlannerTest {
}
@Test
- public void TestWriteParallelPlan() throws IllegalPathException {
+ public void TestInsertRowNodeParallelPlan() throws IllegalPathException {
QueryId queryId = new QueryId("test_write");
InsertRowNode insertRowNode =
new InsertRowNode(
@@ -282,6 +283,48 @@ public class DistributionPlannerTest {
assertEquals(1, plan.getInstances().size());
}
+ @Test
+ public void TestInsertRowsNodeParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
+ InsertRowNode insertRowNode1 =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ },
+ new TSDataType[] {TSDataType.INT32},
+ 1L,
+ new Object[] {10});
+
+ InsertRowNode insertRowNode2 =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ },
+ new TSDataType[] {TSDataType.INT32},
+ 100000L,
+ new Object[] {10});
+
+ InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+ node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+ node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(1, plan.getInstances().size());
+ }
+
private Analysis constructAnalysis() {
Analysis analysis = new Analysis();
@@ -295,21 +338,25 @@ public class DistributionPlannerTest {
Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
new HashMap<>();
- List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
- d1DataRegions.add(
+ List<RegionReplicaSet> d1DataRegion1 = new ArrayList<>();
+ d1DataRegion1.add(
new RegionReplicaSet(
new DataRegionId(1),
Arrays.asList(
new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)),
new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000)))));
- d1DataRegions.add(
+
+ List<RegionReplicaSet> d1DataRegion2 = new ArrayList<>();
+ d1DataRegion2.add(
new RegionReplicaSet(
new DataRegionId(2),
Arrays.asList(
new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
+
Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
- d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions);
+ d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegion1);
+ d1DataRegionMap.put(new TimePartitionSlot(1), d1DataRegion2);
List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
d2DataRegions.add(