You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/28 10:04:40 UTC

[iotdb] branch master updated: Fix bugs of batch insert (#5714)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d89eed024 Fix bugs of batch insert (#5714)
6d89eed024 is described below

commit 6d89eed0248b68372babdc22a211d12dd06b5c9a
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Apr 28 18:04:33 2022 +0800

    Fix bugs of batch insert (#5714)
---
 .../iotdb/db/mpp/sql/analyze/SchemaValidator.java  |  14 +--
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  13 ++-
 .../plan/node/write/BatchInsertNode.java}          |   9 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  10 ++-
 .../planner/plan/node/write/InsertRowsNode.java    |  10 ++-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java | 100 +++++++++++++++++----
 .../write/InsertRowsOfOneDeviceNodeSerdeTest.java  |  19 ++--
 7 files changed, 134 insertions(+), 41 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
index 153da87d8e..112e9ede59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.sql.analyze;
 
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.BatchInsertNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
 
 public class SchemaValidator {
 
@@ -31,14 +31,14 @@ public class SchemaValidator {
   public static SchemaTree validate(InsertNode insertNode) {
 
     SchemaTree schemaTree;
-    if (insertNode instanceof BatchInsert) {
-      BatchInsert batchInsert = (BatchInsert) insertNode;
+    if (insertNode instanceof BatchInsertNode) {
+      BatchInsertNode batchInsertNode = (BatchInsertNode) insertNode;
       schemaTree =
           schemaFetcher.fetchSchemaListWithAutoCreate(
-              batchInsert.getDevicePaths(),
-              batchInsert.getMeasurementsList(),
-              batchInsert.getDataTypesList(),
-              batchInsert.getAlignedList());
+              batchInsertNode.getDevicePaths(),
+              batchInsertNode.getMeasurementsList(),
+              batchInsertNode.getDataTypesList(),
+              batchInsertNode.getAlignedList());
     } else {
       schemaTree =
           schemaFetcher.fetchSchemaWithAutoCreate(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 6e752421b4..51f946b2ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -341,10 +342,13 @@ public class LogicalPlanner {
       // convert insert statement to insert node
       InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode =
           new InsertRowsOfOneDeviceNode(context.getQueryId().genPlanNodeId());
+
+      List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+      List<Integer> insertRowNodeIndexList = new ArrayList<>();
       for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
         InsertRowStatement insertRowStatement =
             insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
-        insertRowsOfOneDeviceNode.addOneInsertRowNode(
+        insertRowNodeList.add(
             new InsertRowNode(
                 insertRowsOfOneDeviceNode.getPlanNodeId(),
                 insertRowStatement.getDevicePath(),
@@ -353,9 +357,12 @@ public class LogicalPlanner {
                 insertRowStatement.getDataTypes(),
                 insertRowStatement.getTime(),
                 insertRowStatement.getValues(),
-                insertRowStatement.isNeedInferType()),
-            i);
+                insertRowStatement.isNeedInferType()));
+        insertRowNodeIndexList.add(i);
       }
+
+      insertRowsOfOneDeviceNode.setInsertRowNodeList(insertRowNodeList);
+      insertRowsOfOneDeviceNode.setInsertRowNodeIndexList(insertRowNodeIndexList);
       return insertRowsOfOneDeviceNode;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/BatchInsertNode.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/BatchInsertNode.java
index 6592cadd52..01291b6e2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/BatchInsertNode.java
@@ -17,15 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.statement.crud;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
 
-/** BatchInsert contains multiple sub insert. */
-public interface BatchInsert {
+/**
+ * BatchInsertNode contains multiple sub insert. Insert node which contains multiple sub insert
+ * nodes needs to implement it.
+ */
+public interface BatchInsertNode {
 
   List<PartialPath> getDevicePaths();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 49088f418d..6e26e0ef2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -40,7 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertMultiTabletsNode extends InsertNode implements BatchInsert {
+public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNode {
 
   /**
    * the value is used to indict the parent InsertTabletNode's index when the parent
@@ -191,6 +190,13 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsert {
     return null;
   }
 
+  @Override
+  public void setMeasurementSchemas(SchemaTree schemaTree) {
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      insertTabletNode.setMeasurementSchemas(schemaTree);
+    }
+  }
+
   @Override
   public List<PartialPath> getDevicePaths() {
     List<PartialPath> partialPaths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index eccede30d9..07d91a040b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -41,7 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertRowsNode extends InsertNode implements BatchInsert {
+public class InsertRowsNode extends InsertNode implements BatchInsertNode {
 
   /**
    * Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -114,6 +113,13 @@ public class InsertRowsNode extends InsertNode implements BatchInsert {
     return true;
   }
 
+  @Override
+  public void setMeasurementSchemas(SchemaTree schemaTree) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      insertRowNode.setMeasurementSchemas(schemaTree);
+    }
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 1705d10f38..f8adf383c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -37,12 +37,15 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
-public class InsertRowsOfOneDeviceNode extends InsertNode {
+public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsertNode {
 
   /**
    * Suppose there is an InsertRowsOfOneDeviceNode, which contains 5 InsertRowNodes,
@@ -91,6 +94,25 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
 
   public void setInsertRowNodeList(List<InsertRowNode> insertRowNodeList) {
     this.insertRowNodeList = insertRowNodeList;
+
+    if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+      return;
+    }
+
+    devicePath = insertRowNodeList.get(0).getDevicePath();
+    isAligned = insertRowNodeList.get(0).isAligned;
+    Map<String, TSDataType> measurementsAndDataType = new HashMap<>();
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      List<String> measurements = Arrays.asList(insertRowNode.getMeasurements());
+      Map<String, TSDataType> subMap =
+          measurements.stream()
+              .collect(
+                  Collectors.toMap(
+                      key -> key, key -> insertRowNode.dataTypes[measurements.indexOf(key)]));
+      measurementsAndDataType.putAll(subMap);
+    }
+    measurements = measurementsAndDataType.keySet().toArray(new String[0]);
+    dataTypes = measurementsAndDataType.values().toArray(new TSDataType[0]);
   }
 
   @Override
@@ -136,35 +158,45 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     return true;
   }
 
+  @Override
+  public void setMeasurementSchemas(SchemaTree schemaTree) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      insertRowNode.setMeasurementSchemas(schemaTree);
+    }
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
+    List<WritePlanNode> result = new ArrayList<>();
+
+    Map<TRegionReplicaSet, List<InsertRowNode>> splitMap = new HashMap<>();
+    Map<TRegionReplicaSet, List<Integer>> splitMapForIndex = new HashMap<>();
+
     for (int i = 0; i < insertRowNodeList.size(); i++) {
       InsertRowNode insertRowNode = insertRowNodeList.get(i);
-      // data region for insert row node
       TRegionReplicaSet dataRegionReplicaSet =
           analysis
               .getDataPartitionInfo()
               .getDataRegionReplicaSetForWriting(
                   devicePath.getFullPath(),
                   StorageEngineV2.getTimePartitionSlot(insertRowNode.getTime()));
-      if (splitMap.containsKey(dataRegionReplicaSet)) {
-        InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
-        tmpNode.addOneInsertRowNode(insertRowNode, i);
-      } else {
-        InsertRowsNode tmpNode = new InsertRowsNode(this.getPlanNodeId());
-        tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
-        tmpNode.addOneInsertRowNode(insertRowNode, i);
-        splitMap.put(dataRegionReplicaSet, tmpNode);
-      }
-    }
+      List<InsertRowNode> tmpMap =
+          splitMap.computeIfAbsent(dataRegionReplicaSet, k -> new ArrayList<>());
+      List<Integer> tmpIndexMap =
+          splitMapForIndex.computeIfAbsent(dataRegionReplicaSet, k -> new ArrayList<>());
 
-    return new ArrayList<>(splitMap.values());
-  }
+      tmpMap.add(insertRowNode);
+      tmpIndexMap.add(insertRowNodeIndexList.get(i));
+    }
 
-  public void addOneInsertRowNode(InsertRowNode node, int index) {
-    insertRowNodeList.add(node);
-    insertRowNodeIndexList.add(index);
+    for (Map.Entry<TRegionReplicaSet, List<InsertRowNode>> entry : splitMap.entrySet()) {
+      InsertRowsOfOneDeviceNode reducedNode = new InsertRowsOfOneDeviceNode(this.getPlanNodeId());
+      reducedNode.setInsertRowNodeList(entry.getValue());
+      reducedNode.setInsertRowNodeIndexList(splitMapForIndex.get(entry.getKey()));
+      reducedNode.setDataRegionReplicaSet(entry.getKey());
+      result.add(reducedNode);
+    }
+    return result;
   }
 
   public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
@@ -233,4 +265,36 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   public int hashCode() {
     return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
   }
+
+  @Override
+  public List<PartialPath> getDevicePaths() {
+    if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(insertRowNodeList.get(0).devicePath);
+  }
+
+  @Override
+  public List<String[]> getMeasurementsList() {
+    if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(measurements);
+  }
+
+  @Override
+  public List<TSDataType[]> getDataTypesList() {
+    if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(dataTypes);
+  }
+
+  @Override
+  public List<Boolean> getAlignedList() {
+    if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(insertRowNodeList.get(0).isAligned);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
index a93cf246a0..f97a709159 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
@@ -31,6 +31,8 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 public class InsertRowsOfOneDeviceNodeSerdeTest {
 
@@ -39,7 +41,9 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
     PartialPath device = new PartialPath("root.sg.d");
     InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new PlanNodeId("plan node 1"));
     node.setDevicePath(device);
-    node.addOneInsertRowNode(
+    List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+    List<Integer> insertRowNodeIndexList = new ArrayList<>();
+    insertRowNodeList.add(
         new InsertRowNode(
             new PlanNodeId("plan node 1"),
             device,
@@ -48,10 +52,9 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
             new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
             1000L,
             new Object[] {1.0, 2f, 300L},
-            false),
-        0);
+            false));
 
-    node.addOneInsertRowNode(
+    insertRowNodeList.add(
         new InsertRowNode(
             new PlanNodeId("plan node 1"),
             device,
@@ -60,8 +63,12 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
             new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
             2000L,
             new Object[] {2.0, false},
-            false),
-        1);
+            false));
+    insertRowNodeIndexList.add(0);
+    insertRowNodeIndexList.add(1);
+
+    node.setInsertRowNodeList(insertRowNodeList);
+    node.setInsertRowNodeIndexList(insertRowNodeIndexList);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
     node.serialize(byteBuffer);