You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/28 10:04:40 UTC
[iotdb] branch master updated: Fix bugs of batch insert (#5714)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d89eed024 Fix bugs of batch insert (#5714)
6d89eed024 is described below
commit 6d89eed0248b68372babdc22a211d12dd06b5c9a
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Apr 28 18:04:33 2022 +0800
Fix bugs of batch insert (#5714)
---
.../iotdb/db/mpp/sql/analyze/SchemaValidator.java | 14 +--
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 13 ++-
.../plan/node/write/BatchInsertNode.java} | 9 +-
.../plan/node/write/InsertMultiTabletsNode.java | 10 ++-
.../planner/plan/node/write/InsertRowsNode.java | 10 ++-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 100 +++++++++++++++++----
.../write/InsertRowsOfOneDeviceNodeSerdeTest.java | 19 ++--
7 files changed, 134 insertions(+), 41 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
index 153da87d8e..112e9ede59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.BatchInsertNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
public class SchemaValidator {
@@ -31,14 +31,14 @@ public class SchemaValidator {
public static SchemaTree validate(InsertNode insertNode) {
SchemaTree schemaTree;
- if (insertNode instanceof BatchInsert) {
- BatchInsert batchInsert = (BatchInsert) insertNode;
+ if (insertNode instanceof BatchInsertNode) {
+ BatchInsertNode batchInsertNode = (BatchInsertNode) insertNode;
schemaTree =
schemaFetcher.fetchSchemaListWithAutoCreate(
- batchInsert.getDevicePaths(),
- batchInsert.getMeasurementsList(),
- batchInsert.getDataTypesList(),
- batchInsert.getAlignedList());
+ batchInsertNode.getDevicePaths(),
+ batchInsertNode.getMeasurementsList(),
+ batchInsertNode.getDataTypesList(),
+ batchInsertNode.getAlignedList());
} else {
schemaTree =
schemaFetcher.fetchSchemaWithAutoCreate(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 6e752421b4..51f946b2ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -341,10 +342,13 @@ public class LogicalPlanner {
// convert insert statement to insert node
InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode =
new InsertRowsOfOneDeviceNode(context.getQueryId().genPlanNodeId());
+
+ List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+ List<Integer> insertRowNodeIndexList = new ArrayList<>();
for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
InsertRowStatement insertRowStatement =
insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
- insertRowsOfOneDeviceNode.addOneInsertRowNode(
+ insertRowNodeList.add(
new InsertRowNode(
insertRowsOfOneDeviceNode.getPlanNodeId(),
insertRowStatement.getDevicePath(),
@@ -353,9 +357,12 @@ public class LogicalPlanner {
insertRowStatement.getDataTypes(),
insertRowStatement.getTime(),
insertRowStatement.getValues(),
- insertRowStatement.isNeedInferType()),
- i);
+ insertRowStatement.isNeedInferType()));
+ insertRowNodeIndexList.add(i);
}
+
+ insertRowsOfOneDeviceNode.setInsertRowNodeList(insertRowNodeList);
+ insertRowsOfOneDeviceNode.setInsertRowNodeIndexList(insertRowNodeIndexList);
return insertRowsOfOneDeviceNode;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/BatchInsertNode.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/BatchInsertNode.java
index 6592cadd52..01291b6e2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/BatchInsertNode.java
@@ -17,15 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.statement.crud;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.List;
-/** BatchInsert contains multiple sub insert. */
-public interface BatchInsert {
+/**
+ * BatchInsertNode contains multiple sub insert. Insert node which contains multiple sub insert
+ * nodes needs to implement it.
+ */
+public interface BatchInsertNode {
List<PartialPath> getDevicePaths();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 49088f418d..6e26e0ef2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -40,7 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class InsertMultiTabletsNode extends InsertNode implements BatchInsert {
+public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNode {
/**
* the value is used to indict the parent InsertTabletNode's index when the parent
@@ -191,6 +190,13 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsert {
return null;
}
+ @Override
+ public void setMeasurementSchemas(SchemaTree schemaTree) {
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ insertTabletNode.setMeasurementSchemas(schemaTree);
+ }
+ }
+
@Override
public List<PartialPath> getDevicePaths() {
List<PartialPath> partialPaths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index eccede30d9..07d91a040b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -41,7 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class InsertRowsNode extends InsertNode implements BatchInsert {
+public class InsertRowsNode extends InsertNode implements BatchInsertNode {
/**
* Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -114,6 +113,13 @@ public class InsertRowsNode extends InsertNode implements BatchInsert {
return true;
}
+ @Override
+ public void setMeasurementSchemas(SchemaTree schemaTree) {
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ insertRowNode.setMeasurementSchemas(schemaTree);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 1705d10f38..f8adf383c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -37,12 +37,15 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
-public class InsertRowsOfOneDeviceNode extends InsertNode {
+public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsertNode {
/**
* Suppose there is an InsertRowsOfOneDeviceNode, which contains 5 InsertRowNodes,
@@ -91,6 +94,25 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
public void setInsertRowNodeList(List<InsertRowNode> insertRowNodeList) {
this.insertRowNodeList = insertRowNodeList;
+
+ if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+ return;
+ }
+
+ devicePath = insertRowNodeList.get(0).getDevicePath();
+ isAligned = insertRowNodeList.get(0).isAligned;
+ Map<String, TSDataType> measurementsAndDataType = new HashMap<>();
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ List<String> measurements = Arrays.asList(insertRowNode.getMeasurements());
+ Map<String, TSDataType> subMap =
+ measurements.stream()
+ .collect(
+ Collectors.toMap(
+ key -> key, key -> insertRowNode.dataTypes[measurements.indexOf(key)]));
+ measurementsAndDataType.putAll(subMap);
+ }
+ measurements = measurementsAndDataType.keySet().toArray(new String[0]);
+ dataTypes = measurementsAndDataType.values().toArray(new TSDataType[0]);
}
@Override
@@ -136,35 +158,45 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return true;
}
+ @Override
+ public void setMeasurementSchemas(SchemaTree schemaTree) {
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ insertRowNode.setMeasurementSchemas(schemaTree);
+ }
+ }
+
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
- Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
+ List<WritePlanNode> result = new ArrayList<>();
+
+ Map<TRegionReplicaSet, List<InsertRowNode>> splitMap = new HashMap<>();
+ Map<TRegionReplicaSet, List<Integer>> splitMapForIndex = new HashMap<>();
+
for (int i = 0; i < insertRowNodeList.size(); i++) {
InsertRowNode insertRowNode = insertRowNodeList.get(i);
- // data region for insert row node
TRegionReplicaSet dataRegionReplicaSet =
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
devicePath.getFullPath(),
StorageEngineV2.getTimePartitionSlot(insertRowNode.getTime()));
- if (splitMap.containsKey(dataRegionReplicaSet)) {
- InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
- tmpNode.addOneInsertRowNode(insertRowNode, i);
- } else {
- InsertRowsNode tmpNode = new InsertRowsNode(this.getPlanNodeId());
- tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
- tmpNode.addOneInsertRowNode(insertRowNode, i);
- splitMap.put(dataRegionReplicaSet, tmpNode);
- }
- }
+ List<InsertRowNode> tmpMap =
+ splitMap.computeIfAbsent(dataRegionReplicaSet, k -> new ArrayList<>());
+ List<Integer> tmpIndexMap =
+ splitMapForIndex.computeIfAbsent(dataRegionReplicaSet, k -> new ArrayList<>());
- return new ArrayList<>(splitMap.values());
- }
+ tmpMap.add(insertRowNode);
+ tmpIndexMap.add(insertRowNodeIndexList.get(i));
+ }
- public void addOneInsertRowNode(InsertRowNode node, int index) {
- insertRowNodeList.add(node);
- insertRowNodeIndexList.add(index);
+ for (Map.Entry<TRegionReplicaSet, List<InsertRowNode>> entry : splitMap.entrySet()) {
+ InsertRowsOfOneDeviceNode reducedNode = new InsertRowsOfOneDeviceNode(this.getPlanNodeId());
+ reducedNode.setInsertRowNodeList(entry.getValue());
+ reducedNode.setInsertRowNodeIndexList(splitMapForIndex.get(entry.getKey()));
+ reducedNode.setDataRegionReplicaSet(entry.getKey());
+ result.add(reducedNode);
+ }
+ return result;
}
public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
@@ -233,4 +265,36 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
public int hashCode() {
return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
}
+
+ @Override
+ public List<PartialPath> getDevicePaths() {
+ if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(insertRowNodeList.get(0).devicePath);
+ }
+
+ @Override
+ public List<String[]> getMeasurementsList() {
+ if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(measurements);
+ }
+
+ @Override
+ public List<TSDataType[]> getDataTypesList() {
+ if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(dataTypes);
+ }
+
+ @Override
+ public List<Boolean> getAlignedList() {
+ if (insertRowNodeList == null || insertRowNodeList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(insertRowNodeList.get(0).isAligned);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
index a93cf246a0..f97a709159 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
@@ -31,6 +31,8 @@ import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
public class InsertRowsOfOneDeviceNodeSerdeTest {
@@ -39,7 +41,9 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
PartialPath device = new PartialPath("root.sg.d");
InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new PlanNodeId("plan node 1"));
node.setDevicePath(device);
- node.addOneInsertRowNode(
+ List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+ List<Integer> insertRowNodeIndexList = new ArrayList<>();
+ insertRowNodeList.add(
new InsertRowNode(
new PlanNodeId("plan node 1"),
device,
@@ -48,10 +52,9 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
1000L,
new Object[] {1.0, 2f, 300L},
- false),
- 0);
+ false));
- node.addOneInsertRowNode(
+ insertRowNodeList.add(
new InsertRowNode(
new PlanNodeId("plan node 1"),
device,
@@ -60,8 +63,12 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
2000L,
new Object[] {2.0, false},
- false),
- 1);
+ false));
+ insertRowNodeIndexList.add(0);
+ insertRowNodeIndexList.add(1);
+
+ node.setInsertRowNodeList(insertRowNodeList);
+ node.setInsertRowNodeIndexList(insertRowNodeIndexList);
ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
node.serialize(byteBuffer);