You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/24 03:40:26 UTC

[iotdb] branch fast_write_test_0423 updated: add FastInsertRowsStatement

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this push:
     new 7e0eb29e77 add FastInsertRowsStatement
7e0eb29e77 is described below

commit 7e0eb29e77e91ded2050e53d878378868a94ae0f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 24 11:40:13 2023 +0800

    add FastInsertRowsStatement
---
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  6 ++++
 .../db/mpp/plan/parser/StatementGenerator.java     |  3 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  7 +++--
 .../plan/node/write/FastInsertRowsNode.java        | 33 ++++++++++++++++++++++
 .../db/mpp/plan/statement/StatementVisitor.java    |  3 +-
 .../crud/FastInsertRowsStatement.java}             | 21 +++++---------
 6 files changed, 55 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 4733c9e08e..fe055d73ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -1930,6 +1931,11 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         insertRowStatement, Collections.singletonList(dataPartitionQueryParam));
   }
 
+  public Analysis visitFastInsertRows(
+      FastInsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+    return visitInsertRows(insertRowsStatement, context);
+  }
+
   @Override
   public Analysis visitInsertRows(
       InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index a5d9e25c96..5ca35248fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -386,7 +387,7 @@ public class StatementGenerator {
       throws IllegalPathException, QueryProcessException {
     final long startTime = System.nanoTime();
     // construct insert statement
-    InsertRowsStatement insertStatement = new InsertRowsStatement();
+    FastInsertRowsStatement insertStatement = new FastInsertRowsStatement();
     List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
     for (int i = 0; i < req.prefixPaths.size(); i++) {
       FastInsertRowStatement statement = new FastInsertRowStatement();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 51cd99fd66..b603a277b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -52,6 +53,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -636,9 +638,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
 
   @Override
   public PlanNode visitFastInsertRows(
-      InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+      FastInsertRowsStatement insertRowsStatement, MPPQueryContext context) {
     // convert insert statement to insert node
-    InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+    FastInsertRowsNode insertRowsNode =
+        new FastInsertRowsNode(context.getQueryId().genPlanNodeId());
     for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
       InsertRowStatement fastInsertRowStatement =
           insertRowsStatement.getInsertRowStatementList().get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index e47fd9434b..aaa3815f63 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -19,9 +19,16 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class FastInsertRowsNode extends InsertRowsNode {
   public FastInsertRowsNode(PlanNodeId id) {
@@ -34,4 +41,30 @@ public class FastInsertRowsNode extends InsertRowsNode {
       List<InsertRowNode> fastInsertRowNodeList) {
     super(id, insertRowNodeIndexList, fastInsertRowNodeList);
   }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
+    for (int i = 0; i < getInsertRowNodeList().size(); i++) {
+      InsertRowNode insertRowNode = getInsertRowNodeList().get(i);
+      // data region for insert row node
+      TRegionReplicaSet dataRegionReplicaSet =
+          analysis
+              .getDataPartitionInfo()
+              .getDataRegionReplicaSetForWriting(
+                  insertRowNode.devicePath.getFullPath(),
+                  TimePartitionUtils.getTimePartition(insertRowNode.getTime()));
+      if (splitMap.containsKey(dataRegionReplicaSet)) {
+        InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
+        tmpNode.addOneInsertRowNode(insertRowNode, i);
+      } else {
+        InsertRowsNode tmpNode = new FastInsertRowsNode(this.getPlanNodeId());
+        tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+        tmpNode.addOneInsertRowNode(insertRowNode, i);
+        splitMap.put(dataRegionReplicaSet, tmpNode);
+      }
+    }
+
+    return new ArrayList<>(splitMap.values());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 2e77c0b371..613b32ec80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.statement;
 
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FastInsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -326,7 +327,7 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(insertRowsStatement, context);
   }
 
-  public R visitFastInsertRows(InsertRowsStatement insertRowsStatement, C context) {
+  public R visitFastInsertRows(FastInsertRowsStatement insertRowsStatement, C context) {
     return visitStatement(insertRowsStatement, context);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowsStatement.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowsStatement.java
index e47fd9434b..c24faf318f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowsStatement.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,21 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+package org.apache.iotdb.db.mpp.plan.statement.crud;
 
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 
-import java.util.List;
+public class FastInsertRowsStatement extends InsertRowsStatement {
 
-public class FastInsertRowsNode extends InsertRowsNode {
-  public FastInsertRowsNode(PlanNodeId id) {
-    super(id);
-  }
-
-  public FastInsertRowsNode(
-      PlanNodeId id,
-      List<Integer> insertRowNodeIndexList,
-      List<InsertRowNode> fastInsertRowNodeList) {
-    super(id, insertRowNodeIndexList, fastInsertRowNodeList);
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitFastInsertRows(this, context);
   }
 }