You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:43:51 UTC

[iotdb] 03/03: add UT for DistributedPlan Write

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b8a351c26ad4f3793343c0f2f809e4898e40261d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 22:43:35 2022 +0800

    add UT for DistributedPlan Write
---
 .../iotdb/commons/partition/DataPartition.java     | 11 ++++-
 .../planner/plan/WriteFragmentParallelPlanner.java |  2 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 57 ++++++++++++++++++++--
 3 files changed, 62 insertions(+), 8 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3f677ce2c9..f352d7cdeb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -68,7 +68,10 @@ public class DataPartition {
     // A list of data region replica sets will store data in a same time partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    return Collections.emptyList();
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot
+    return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> timePartitionSlotList.contains(entry.getKey())).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
   }
 
   public RegionReplicaSet getDataRegionReplicaSetForWriting(
@@ -76,7 +79,11 @@ public class DataPartition {
     // A list of data region replica sets will store data in a same time partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    return null;
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    List<RegionReplicaSet> regions = dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> entry.getKey().equals(timePartitionSlot)).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
+    // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot
+    return regions.get(0);
   }
 
   private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
index d8c9435214..a31ac6f25f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -62,7 +62,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               fragment.getId().genFragmentInstanceId(),
               timeFilter,
               queryContext.getQueryType());
-      instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
+      instance.setDataRegionAndHost(split.getRegionReplicaSet());
       ret.add(instance);
     }
     return ret;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 39d9c6a71d..b4b77956f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -257,7 +258,7 @@ public class DistributionPlannerTest {
   }
 
   @Test
-  public void TestWriteParallelPlan() throws IllegalPathException {
+  public void TestInsertRowNodeParallelPlan() throws IllegalPathException {
     QueryId queryId = new QueryId("test_write");
     InsertRowNode insertRowNode =
         new InsertRowNode(
@@ -282,6 +283,48 @@ public class DistributionPlannerTest {
     assertEquals(1, plan.getInstances().size());
   }
 
+  @Test
+  public void TestInsertRowsNodeParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode1 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+                new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            1L,
+            new Object[] {10});
+
+    InsertRowNode insertRowNode2 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+                new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            100000L,
+            new Object[] {10});
+
+    InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+    node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+    node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(1, plan.getInstances().size());
+  }
+
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
 
@@ -295,21 +338,25 @@ public class DistributionPlannerTest {
     Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
         new HashMap<>();
 
-    List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(
+    List<RegionReplicaSet> d1DataRegion1 = new ArrayList<>();
+    d1DataRegion1.add(
         new RegionReplicaSet(
             new DataRegionId(1),
             Arrays.asList(
                 new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)),
                 new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000)))));
-    d1DataRegions.add(
+
+    List<RegionReplicaSet> d1DataRegion2 = new ArrayList<>();
+    d1DataRegion2.add(
         new RegionReplicaSet(
             new DataRegionId(2),
             Arrays.asList(
                 new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
                 new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
+
     Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions);
+    d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegion1);
+    d1DataRegionMap.put(new TimePartitionSlot(1), d1DataRegion2);
 
     List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
     d2DataRegions.add(