You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/24 03:40:26 UTC
[iotdb] branch fast_write_test_0423 updated: add FastInsertRowsStatement
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this push:
new 7e0eb29e77 add FastInsertRowsStatement
7e0eb29e77 is described below
commit 7e0eb29e77e91ded2050e53d878378868a94ae0f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 24 11:40:13 2023 +0800
add FastInsertRowsStatement
---
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 6 ++++
.../db/mpp/plan/parser/StatementGenerator.java | 3 +-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 7 +++--
.../plan/node/write/FastInsertRowsNode.java | 33 ++++++++++++++++++++++
.../db/mpp/plan/statement/StatementVisitor.java | 3 +-
.../crud/FastInsertRowsStatement.java} | 21 +++++---------
6 files changed, 55 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 4733c9e08e..fe055d73ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -1930,6 +1931,11 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
insertRowStatement, Collections.singletonList(dataPartitionQueryParam));
}
+ public Analysis visitFastInsertRows(
+ FastInsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ return visitInsertRows(insertRowsStatement, context);
+ }
+
@Override
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index a5d9e25c96..5ca35248fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -386,7 +387,7 @@ public class StatementGenerator {
throws IllegalPathException, QueryProcessException {
final long startTime = System.nanoTime();
// construct insert statement
- InsertRowsStatement insertStatement = new InsertRowsStatement();
+ FastInsertRowsStatement insertStatement = new FastInsertRowsStatement();
List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
for (int i = 0; i < req.prefixPaths.size(); i++) {
FastInsertRowStatement statement = new FastInsertRowStatement();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 51cd99fd66..b603a277b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -52,6 +53,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -636,9 +638,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
@Override
public PlanNode visitFastInsertRows(
- InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ FastInsertRowsStatement insertRowsStatement, MPPQueryContext context) {
// convert insert statement to insert node
- InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+ FastInsertRowsNode insertRowsNode =
+ new FastInsertRowsNode(context.getQueryId().genPlanNodeId());
for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
InsertRowStatement fastInsertRowStatement =
insertRowsStatement.getInsertRowStatementList().get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index e47fd9434b..aaa3815f63 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -19,9 +19,16 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class FastInsertRowsNode extends InsertRowsNode {
public FastInsertRowsNode(PlanNodeId id) {
@@ -34,4 +41,30 @@ public class FastInsertRowsNode extends InsertRowsNode {
List<InsertRowNode> fastInsertRowNodeList) {
super(id, insertRowNodeIndexList, fastInsertRowNodeList);
}
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
+ for (int i = 0; i < getInsertRowNodeList().size(); i++) {
+ InsertRowNode insertRowNode = getInsertRowNodeList().get(i);
+ // data region for insert row node
+ TRegionReplicaSet dataRegionReplicaSet =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(
+ insertRowNode.devicePath.getFullPath(),
+ TimePartitionUtils.getTimePartition(insertRowNode.getTime()));
+ if (splitMap.containsKey(dataRegionReplicaSet)) {
+ InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
+ tmpNode.addOneInsertRowNode(insertRowNode, i);
+ } else {
+ InsertRowsNode tmpNode = new FastInsertRowsNode(this.getPlanNodeId());
+ tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+ tmpNode.addOneInsertRowNode(insertRowNode, i);
+ splitMap.put(dataRegionReplicaSet, tmpNode);
+ }
+ }
+
+ return new ArrayList<>(splitMap.values());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 2e77c0b371..613b32ec80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -326,7 +327,7 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(insertRowsStatement, context);
}
- public R visitFastInsertRows(InsertRowsStatement insertRowsStatement, C context) {
+ public R visitFastInsertRows(FastInsertRowsStatement insertRowsStatement, C context) {
return visitStatement(insertRowsStatement, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowsStatement.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowsStatement.java
index e47fd9434b..c24faf318f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowsStatement.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,21 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+package org.apache.iotdb.db.mpp.plan.statement.crud;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
-import java.util.List;
+public class FastInsertRowsStatement extends InsertRowsStatement {
-public class FastInsertRowsNode extends InsertRowsNode {
- public FastInsertRowsNode(PlanNodeId id) {
- super(id);
- }
-
- public FastInsertRowsNode(
- PlanNodeId id,
- List<Integer> insertRowNodeIndexList,
- List<InsertRowNode> fastInsertRowNodeList) {
- super(id, insertRowNodeIndexList, fastInsertRowNodeList);
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitFastInsertRows(this, context);
}
}