You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/09 10:09:10 UTC
[iotdb] branch master updated: [IOTDB-2827] Batch insert in new cluster (#5412)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 168cfe85ce [IOTDB-2827] Batch insert in new cluster (#5412)
168cfe85ce is described below
commit 168cfe85ce3d615a5d87c462c6fde8825133f9a5
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Sat Apr 9 18:09:05 2022 +0800
[IOTDB-2827] Batch insert in new cluster (#5412)
---
.../iotdb/cluster/coordinator/Coordinator.java | 26 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 10 +-
.../iotdb/cluster/log/applier/DataLogApplier.java | 14 +-
.../iotdb/cluster/metadata/CSchemaProcessor.java | 20 +-
.../iotdb/cluster/query/ClusterPlanRouter.java | 59 +++--
.../cluster/server/member/DataGroupMember.java | 9 +-
.../iotdb/commons/partition/DataPartition.java | 4 +-
.../iotdb/db/engine/cq/ContinuousQueryTask.java | 4 +-
.../db/mpp/common/schematree/PathPatternTree.java | 2 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 153 +++++++++++-
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 8 +
.../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 8 +
.../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java | 5 +
.../mpp/sql/analyze/StandaloneSchemaFetcher.java | 8 +
.../db/mpp/sql/parser/StatementGenerator.java | 151 +++++++++++-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 94 ++++++++
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 9 +-
.../plan/node/write/InsertMultiTabletNode.java | 69 ------
.../plan/node/write/InsertMultiTabletsNode.java | 164 +++++++++++++
.../planner/plan/node/write/InsertRowsNode.java | 76 +++++-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 72 +++++-
.../db/mpp/sql/statement/StatementVisitor.java | 19 +-
.../sql/statement/crud/InsertBaseStatement.java | 3 -
.../crud/InsertMultiTabletsStatement.java | 80 +++++++
.../mpp/sql/statement/crud/InsertRowStatement.java | 1 -
.../crud/InsertRowsOfOneDeviceStatement.java | 90 +++++++
.../sql/statement/crud/InsertRowsStatement.java | 87 +++++++
.../sql/statement/crud/InsertTabletStatement.java | 6 +-
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 4 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 59 ++---
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 11 +-
...TabletPlan.java => InsertMultiTabletsPlan.java} | 24 +-
.../db/service/thrift/impl/TSServiceImpl.java | 261 +++++++++++++++++++--
.../db/metadata/idtable/InsertWithIDTableTest.java | 7 +-
.../db/qp/physical/InsertTabletMultiPlanTest.java | 19 +-
35 files changed, 1373 insertions(+), 263 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index b779cb023a..7d7230b49e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -48,7 +48,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -445,7 +445,7 @@ public class Coordinator {
TSStatus status;
// need to create substatus for multiPlan
- // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan
+ // InsertTabletPlan, InsertMultiTabletsPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan
// contains many rows,
// each will correspond to a TSStatus as its execution result,
// as the plan is split and the sub-plans may have interleaving ranges,
@@ -453,7 +453,7 @@ public class Coordinator {
// e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
// belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
// failure and success should be placed into proper positions in TSStatus.subStatus
- if (plan instanceof InsertMultiTabletPlan
+ if (plan instanceof InsertMultiTabletsPlan
|| plan instanceof CreateMultiTimeSeriesPlan
|| plan instanceof InsertRowsPlan) {
status = forwardMultiSubPlan(planGroupMap, plan);
@@ -567,11 +567,11 @@ public class Coordinator {
(tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) || isBatchFailure;
if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
|| tmpStatus.isSetRedirectNode() && !(parentPlan instanceof CreateMultiTimeSeriesPlan)) {
- if (parentPlan instanceof InsertMultiTabletPlan) {
+ if (parentPlan instanceof InsertMultiTabletsPlan) {
// the subStatus is the two-dimensional array,
// The first dimension is the number of InsertTabletPlans,
// and the second dimension is the number of rows per InsertTabletPlan
- totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize();
+ totalRowNum = ((InsertMultiTabletsPlan) parentPlan).getTabletsSize();
} else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
totalRowNum = parentPlan.getPaths().size();
} else if (parentPlan instanceof InsertRowsPlan) {
@@ -583,12 +583,12 @@ public class Coordinator {
Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
}
// set the status from one group to the proper positions of the overall status
- if (parentPlan instanceof InsertMultiTabletPlan) {
- InsertMultiTabletPlan tmpMultiTabletPlan = ((InsertMultiTabletPlan) entry.getKey());
+ if (parentPlan instanceof InsertMultiTabletsPlan) {
+ InsertMultiTabletsPlan tmpMultiTabletPlan = ((InsertMultiTabletsPlan) entry.getKey());
for (int i = 0; i < tmpMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
InsertTabletPlan tmpInsertTabletPlan = tmpMultiTabletPlan.getInsertTabletPlan(i);
int parentIndex = tmpMultiTabletPlan.getParentIndex(i);
- int parentPlanRowCount = ((InsertMultiTabletPlan) parentPlan).getRowCount(parentIndex);
+ int parentPlanRowCount = ((InsertMultiTabletsPlan) parentPlan).getRowCount(parentIndex);
if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
subStatus[parentIndex] = tmpStatus.subStatus.get(i);
if (tmpStatus.subStatus.get(i).getCode()
@@ -610,7 +610,7 @@ public class Coordinator {
if (tmpStatus.isSetRedirectNode()) {
if (tmpStatus.isSetRedirectNode()
&& tmpInsertTabletPlan.getMaxTime()
- == ((InsertMultiTabletPlan) parentPlan)
+ == ((InsertMultiTabletsPlan) parentPlan)
.getInsertTabletPlan(parentIndex)
.getMaxTime()) {
subStatus[parentIndex].setRedirectNode(tmpStatus.redirectNode);
@@ -620,7 +620,7 @@ public class Coordinator {
} else if (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (tmpStatus.isSetRedirectNode()
&& tmpInsertTabletPlan.getMaxTime()
- == ((InsertMultiTabletPlan) parentPlan)
+ == ((InsertMultiTabletsPlan) parentPlan)
.getInsertTabletPlan(parentIndex)
.getMaxTime()) {
subStatus[parentIndex] =
@@ -687,8 +687,8 @@ public class Coordinator {
boolean isBatchFailure,
TSStatus[] subStatus,
List<String> errorCodePartitionGroups) {
- if (parentPlan instanceof InsertMultiTabletPlan
- && !((InsertMultiTabletPlan) parentPlan).getResults().isEmpty()) {
+ if (parentPlan instanceof InsertMultiTabletsPlan
+ && !((InsertMultiTabletsPlan) parentPlan).getResults().isEmpty()) {
if (subStatus == null) {
subStatus = new TSStatus[totalRowNum];
Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
@@ -696,7 +696,7 @@ public class Coordinator {
noFailure = false;
isBatchFailure = true;
for (Map.Entry<Integer, TSStatus> integerTSStatusEntry :
- ((InsertMultiTabletPlan) parentPlan).getResults().entrySet()) {
+ ((InsertMultiTabletsPlan) parentPlan).getResults().entrySet()) {
subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 576f213a9f..0f4a370e40 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -139,9 +139,9 @@ public class AsyncDataLogApplier implements LogApplier {
}
/**
- * We can sure that the storage group of all InsertTabletPlans in InsertMultiTabletPlan are the
+ * We can sure that the storage group of all InsertTabletPlans in InsertMultiTabletsPlan are the
* same. this is done in {@link
- * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertMultiTabletPlan)}
+ * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertMultiTabletsPlan)}
*
* <p>We can also sure that the storage group of all InsertRowPlans in InsertRowsPlan are the
* same. this is done in {@link
@@ -152,8 +152,8 @@ public class AsyncDataLogApplier implements LogApplier {
*/
private PartialPath getPlanSG(PhysicalPlan plan) throws StorageGroupNotSetException {
PartialPath sgPath = null;
- if (plan instanceof InsertMultiTabletPlan) {
- PartialPath deviceId = ((InsertMultiTabletPlan) plan).getFirstDeviceId();
+ if (plan instanceof InsertMultiTabletsPlan) {
+ PartialPath deviceId = ((InsertMultiTabletsPlan) plan).getFirstDeviceId();
sgPath = IoTDB.schemaProcessor.getBelongedStorageGroup(deviceId);
} else if (plan instanceof InsertRowsPlan) {
PartialPath path = ((InsertRowsPlan) plan).getFirstDeviceId();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index f1c5c39f6b..6fe1f08eeb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -36,12 +36,8 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -112,8 +108,8 @@ public class DataLogApplier extends BaseApplier {
} else if (plan instanceof DeleteTimeSeriesPlan) {
((DeleteTimeSeriesPlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
}
- if (plan instanceof InsertMultiTabletPlan) {
- applyInsert((InsertMultiTabletPlan) plan);
+ if (plan instanceof InsertMultiTabletsPlan) {
+ applyInsert((InsertMultiTabletsPlan) plan);
} else if (plan instanceof InsertRowsPlan) {
applyInsert((InsertRowsPlan) plan);
} else if (plan instanceof InsertPlan) {
@@ -123,7 +119,7 @@ public class DataLogApplier extends BaseApplier {
}
}
- private void applyInsert(InsertMultiTabletPlan plan)
+ private void applyInsert(InsertMultiTabletsPlan plan)
throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
boolean hasSync = false;
for (InsertTabletPlan insertTabletPlan : plan.getInsertTabletPlanList()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CSchemaProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CSchemaProcessor.java
index 308e123442..cccf1bfe37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CSchemaProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CSchemaProcessor.java
@@ -60,12 +60,8 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -603,14 +599,14 @@ public class CSchemaProcessor extends LocalSchemaProcessor {
}
/**
- * @param insertMultiTabletPlan the InsertMultiTabletPlan
- * @return true if all InsertTabletPlan in InsertMultiTabletPlan create timeseries success,
+ * @param insertMultiTabletsPlan the InsertMultiTabletsPlan
+ * @return true if all InsertTabletPlan in InsertMultiTabletsPlan create timeseries success,
* otherwise false
*/
- public boolean createTimeseries(InsertMultiTabletPlan insertMultiTabletPlan)
+ public boolean createTimeseries(InsertMultiTabletsPlan insertMultiTabletsPlan)
throws CheckConsistencyException, IllegalPathException {
boolean allSuccess = true;
- for (InsertTabletPlan insertTabletPlan : insertMultiTabletPlan.getInsertTabletPlanList()) {
+ for (InsertTabletPlan insertTabletPlan : insertMultiTabletsPlan.getInsertTabletPlanList()) {
boolean success = createTimeseries(insertTabletPlan);
allSuccess = allSuccess && success;
if (!success) {
@@ -663,8 +659,8 @@ public class CSchemaProcessor extends LocalSchemaProcessor {
*/
public boolean createTimeseries(InsertPlan insertPlan)
throws IllegalPathException, CheckConsistencyException {
- if (insertPlan instanceof InsertMultiTabletPlan) {
- return createTimeseries((InsertMultiTabletPlan) insertPlan);
+ if (insertPlan instanceof InsertMultiTabletsPlan) {
+ return createTimeseries((InsertMultiTabletsPlan) insertPlan);
}
if (insertPlan instanceof InsertRowsPlan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 3dfa431156..1279c2da4d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -34,11 +34,8 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -122,8 +119,8 @@ public class ClusterPlanRouter {
return splitAndRoutePlan((InsertRowsPlan) plan);
} else if (plan instanceof InsertTabletPlan) {
return splitAndRoutePlan((InsertTabletPlan) plan);
- } else if (plan instanceof InsertMultiTabletPlan) {
- return splitAndRoutePlan((InsertMultiTabletPlan) plan);
+ } else if (plan instanceof InsertMultiTabletsPlan) {
+ return splitAndRoutePlan((InsertMultiTabletsPlan) plan);
} else if (plan instanceof CreateTimeSeriesPlan) {
return splitAndRoutePlan((CreateTimeSeriesPlan) plan);
} else if (plan instanceof CreateAlignedTimeSeriesPlan) {
@@ -198,18 +195,18 @@ public class ClusterPlanRouter {
}
/**
- * @param plan InsertMultiTabletPlan
- * @return key is InsertMultiTabletPlan, value is the partition group the plan belongs to, all
- * InsertTabletPlans in InsertMultiTabletPlan belongs to one same storage group.
+ * @param plan InsertMultiTabletsPlan
+ * @return key is InsertMultiTabletsPlan, value is the partition group the plan belongs to, all
+ * InsertTabletPlans in InsertMultiTabletsPlan belongs to one same storage group.
*/
- private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertMultiTabletPlan plan)
+ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertMultiTabletsPlan plan)
throws MetadataException {
/*
* the key of pgSgPathPlanMap is the partition group; the value is one map,
- * the key of the map is storage group, the value is the InsertMultiTabletPlan,
- * all InsertTabletPlans in InsertMultiTabletPlan belongs to one same storage group.
+ * the key of the map is storage group, the value is the InsertMultiTabletsPlan,
+ * all InsertTabletPlans in InsertMultiTabletsPlan belongs to one same storage group.
*/
- Map<PartitionGroup, Map<PartialPath, InsertMultiTabletPlan>> pgSgPathPlanMap = new HashMap<>();
+ Map<PartitionGroup, Map<PartialPath, InsertMultiTabletsPlan>> pgSgPathPlanMap = new HashMap<>();
for (int i = 0; i < plan.getInsertTabletPlanList().size(); i++) {
InsertTabletPlan insertTabletPlan = plan.getInsertTabletPlanList().get(i);
Map<PhysicalPlan, PartitionGroup> tmpResult = splitAndRoutePlan(insertTabletPlan);
@@ -220,49 +217,49 @@ public class ClusterPlanRouter {
// 1.1 the sg that the plan(actually calculated based on device) belongs to
PartialPath tmpSgPath =
IoTDB.schemaProcessor.getBelongedStorageGroup(tmpPlan.getDevicePath());
- Map<PartialPath, InsertMultiTabletPlan> sgPathPlanMap = pgSgPathPlanMap.get(tmpPg);
+ Map<PartialPath, InsertMultiTabletsPlan> sgPathPlanMap = pgSgPathPlanMap.get(tmpPg);
if (sgPathPlanMap == null) {
- // 2.1 construct the InsertMultiTabletPlan
+ // 2.1 construct the InsertMultiTabletsPlan
List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
List<Integer> parentInsetTablePlanIndexList = new ArrayList<>();
insertTabletPlanList.add(tmpPlan);
parentInsetTablePlanIndexList.add(i);
- InsertMultiTabletPlan insertMultiTabletPlan =
- new InsertMultiTabletPlan(insertTabletPlanList, parentInsetTablePlanIndexList);
+ InsertMultiTabletsPlan insertMultiTabletsPlan =
+ new InsertMultiTabletsPlan(insertTabletPlanList, parentInsetTablePlanIndexList);
// 2.2 construct the sgPathPlanMap
sgPathPlanMap = new HashMap<>();
- sgPathPlanMap.put(tmpSgPath, insertMultiTabletPlan);
+ sgPathPlanMap.put(tmpSgPath, insertMultiTabletsPlan);
// 2.3 put the sgPathPlanMap to the pgSgPathPlanMap
pgSgPathPlanMap.put(tmpPg, sgPathPlanMap);
} else {
- InsertMultiTabletPlan insertMultiTabletPlan = sgPathPlanMap.get(tmpSgPath);
- if (insertMultiTabletPlan == null) {
+ InsertMultiTabletsPlan insertMultiTabletsPlan = sgPathPlanMap.get(tmpSgPath);
+ if (insertMultiTabletsPlan == null) {
List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
List<Integer> parentInsetTablePlanIndexList = new ArrayList<>();
insertTabletPlanList.add(tmpPlan);
parentInsetTablePlanIndexList.add(i);
- insertMultiTabletPlan =
- new InsertMultiTabletPlan(insertTabletPlanList, parentInsetTablePlanIndexList);
+ insertMultiTabletsPlan =
+ new InsertMultiTabletsPlan(insertTabletPlanList, parentInsetTablePlanIndexList);
- // 2.4 put the insertMultiTabletPlan to the tmpSgPath
- sgPathPlanMap.put(tmpSgPath, insertMultiTabletPlan);
+ // 2.4 put the insertMultiTabletsPlan to the tmpSgPath
+ sgPathPlanMap.put(tmpSgPath, insertMultiTabletsPlan);
} else {
- // 2.5 just add the tmpPlan to the insertMultiTabletPlan
- insertMultiTabletPlan.addInsertTabletPlan(tmpPlan, i);
+ // 2.5 just add the tmpPlan to the insertMultiTabletsPlan
+ insertMultiTabletsPlan.addInsertTabletPlan(tmpPlan, i);
}
}
}
}
Map<PhysicalPlan, PartitionGroup> result = new HashMap<>(pgSgPathPlanMap.values().size());
- for (Map.Entry<PartitionGroup, Map<PartialPath, InsertMultiTabletPlan>> pgMapEntry :
+ for (Map.Entry<PartitionGroup, Map<PartialPath, InsertMultiTabletsPlan>> pgMapEntry :
pgSgPathPlanMap.entrySet()) {
PartitionGroup pg = pgMapEntry.getKey();
- Map<PartialPath, InsertMultiTabletPlan> sgPathPlanMap = pgMapEntry.getValue();
- // All InsertTabletPlan in InsertMultiTabletPlan belong to the same storage group
- for (Map.Entry<PartialPath, InsertMultiTabletPlan> sgPathEntry : sgPathPlanMap.entrySet()) {
+ Map<PartialPath, InsertMultiTabletsPlan> sgPathPlanMap = pgMapEntry.getValue();
+ // All InsertTabletPlan in InsertMultiTabletsPlan belong to the same storage group
+ for (Map.Entry<PartialPath, InsertMultiTabletsPlan> sgPathEntry : sgPathPlanMap.entrySet()) {
result.put(sgPathEntry.getValue(), pg);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 66d8143cce..47f88b6cab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -726,7 +726,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
try {
if (plan instanceof InsertPlan
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
- if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletPlan) {
+ if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletsPlan) {
if (e instanceof BatchProcessException) {
for (TSStatus status : ((BatchProcessException) e).getFailingStatus()) {
if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
@@ -803,7 +803,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
try {
if (plan instanceof InsertPlan
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
- if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletPlan) {
+ if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletsPlan) {
if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus tmpStatus : status.getSubStatus()) {
if (tmpStatus.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
@@ -848,8 +848,9 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
throws CheckConsistencyException, IllegalPathException {
logger.debug("create time series for failed insertion {}", plan);
// apply measurements according to failed measurements
- if (plan instanceof InsertMultiTabletPlan) {
- for (InsertTabletPlan insertPlan : ((InsertMultiTabletPlan) plan).getInsertTabletPlanList()) {
+ if (plan instanceof InsertMultiTabletsPlan) {
+ for (InsertTabletPlan insertPlan :
+ ((InsertMultiTabletsPlan) plan).getInsertTabletPlanList()) {
if (insertPlan.getFailedMeasurements() != null) {
insertPlan.getPlanFromFailed();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3203d27a12..acd8a5e3c8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -49,7 +49,7 @@ public class DataPartition {
}
public List<RegionReplicaSet> getDataRegionReplicaSetForWriting(
- String deviceName, List<TimePartitionSlot> timePartitionIdList) {
+ String deviceName, List<TimePartitionSlot> timePartitionSlotList) {
// A list of data region replica sets will store data in a same time partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
@@ -57,7 +57,7 @@ public class DataPartition {
}
public RegionReplicaSet getDataRegionReplicaSetForWriting(
- String deviceName, TimePartitionSlot timePartitionIdList) {
+ String deviceName, TimePartitionSlot timePartitionSlot) {
// A list of data region replica sets will store data in a same time partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
index 39a027d0c1..44df581504 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
@@ -151,7 +151,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
if (insertTabletPlans.isEmpty()) {
continue;
}
- if (!serviceProvider.executeNonQuery(new InsertMultiTabletPlan(insertTabletPlans))) {
+ if (!serviceProvider.executeNonQuery(new InsertMultiTabletsPlan(insertTabletPlans))) {
throw new ContinuousQueryException(
String.format(
"failed to execute cq task %s, sql: %s",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 1b07f6790d..6f6b281c6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -47,7 +47,7 @@ public class PathPatternTree {
// TODO
this.root = new PathPatternNode(SQLConstant.ROOT);
this.pathList = new ArrayList<>();
- };
+ }
public PathPatternTree(Map<PartialPath, List<String>> devices) {
// TODO
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index a8a61e7f9f..4376baaa23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -41,10 +41,7 @@ import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
@@ -363,6 +360,7 @@ public class Analyzer {
return analysis;
}
+ @Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
@@ -398,5 +396,152 @@ public class Analyzer {
return analysis;
}
+
+ @Override
+ public Analysis visitInsertRows(
+ InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ // TODO remove duplicate
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ for (InsertRowStatement insertRowStatement :
+ insertRowsStatement.getInsertRowStatementList()) {
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertRowStatement.getTimePartitionSlots());
+ dataPartitionQueryParams.add(dataPartitionQueryParam);
+ }
+
+ PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfos(dataPartitionQueryParams);
+
+ SchemaTree schemaTree = null;
+ if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ schemaTree =
+ schemaFetcher.fetchSchemaListWithAutoCreate(
+ insertRowsStatement.getDevicePaths(),
+ insertRowsStatement.getMeasurementsList(),
+ insertRowsStatement.getDataTypesList());
+ } else {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (InsertRowStatement insertRowStatement :
+ insertRowsStatement.getInsertRowStatementList()) {
+ patternTree.appendPaths(
+ insertRowStatement.getDevicePath(),
+ Arrays.asList(insertRowStatement.getMeasurements()));
+ }
+ schemaFetcher.fetchSchema(patternTree);
+ }
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+
+ try {
+ insertRowsStatement.transferType(schemaTree);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ if (!insertRowsStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ analysis.setStatement(insertRowsStatement);
+ analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+ analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitInsertMultiTablets(
+ InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+ // TODO remove duplicate
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ for (InsertTabletStatement insertTabletStatement :
+ insertMultiTabletsStatement.getInsertTabletStatementList()) {
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertTabletStatement.getTimePartitionSlots());
+ dataPartitionQueryParams.add(dataPartitionQueryParam);
+ }
+
+ PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfos(dataPartitionQueryParams);
+
+ SchemaTree schemaTree = null;
+ if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ schemaTree =
+ schemaFetcher.fetchSchemaListWithAutoCreate(
+ insertMultiTabletsStatement.getDevicePaths(),
+ insertMultiTabletsStatement.getMeasurementsList(),
+ insertMultiTabletsStatement.getDataTypesList());
+ } else {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (InsertTabletStatement insertTabletStatement :
+ insertMultiTabletsStatement.getInsertTabletStatementList()) {
+ patternTree.appendPaths(
+ insertTabletStatement.getDevicePath(),
+ Arrays.asList(insertTabletStatement.getMeasurements()));
+ }
+ schemaFetcher.fetchSchema(patternTree);
+ }
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+
+ if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+ analysis.setStatement(insertMultiTabletsStatement);
+ analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+ analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+
+ return analysis;
+ }
+
+ @Override
+ public Analysis visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(
+ insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ insertRowsOfOneDeviceStatement.getTimePartitionSlots());
+
+ PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
+
+ SchemaTree schemaTree = null;
+ if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ schemaTree =
+ schemaFetcher.fetchSchemaWithAutoCreate(
+ insertRowsOfOneDeviceStatement.getDevicePath(),
+ insertRowsOfOneDeviceStatement.getMeasurements(),
+ insertRowsOfOneDeviceStatement.getDataTypes());
+ } else {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (InsertRowStatement insertRowStatement :
+ insertRowsOfOneDeviceStatement.getInsertRowStatementList()) {
+ patternTree.appendPaths(
+ insertRowStatement.getDevicePath(),
+ Arrays.asList(insertRowStatement.getMeasurements()));
+ }
+ schemaFetcher.fetchSchema(patternTree);
+ }
+ Analysis analysis = new Analysis();
+ analysis.setSchemaTree(schemaTree);
+
+ try {
+ insertRowsOfOneDeviceStatement.transferType(schemaTree);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
+ throw new SemanticException("Data type mismatch");
+ }
+
+ analysis.setStatement(insertRowsOfOneDeviceStatement);
+ analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+ analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+
+ return analysis;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 2af452c50a..cd175e98d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.List;
+
public class ClusterSchemaFetcher implements ISchemaFetcher {
@Override
@@ -35,4 +37,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
return null;
}
+
+ @Override
+ public SchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath, List<String[]> measurements, List<TSDataType[]> tsDataTypes) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
index 3b803eff8e..ed23e6e4d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.List;
+
public class FakeSchemaFetcherImpl implements ISchemaFetcher {
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
@@ -35,4 +37,10 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
return null;
}
+
+ @Override
+ public SchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath, List<String[]> measurements, List<TSDataType[]> tsDataTypes) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
index f7b5d34d85..bad8eaf9f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.List;
+
/**
* This interface is used to fetch the metadata information required in execution plan generating.
*/
@@ -33,4 +35,7 @@ public interface ISchemaFetcher {
SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes);
+
+ SchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath, List<String[]> measurements, List<TSDataType[]> tsDataTypes);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index f6f10e9216..403c9008d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.List;
+
public class StandaloneSchemaFetcher implements ISchemaFetcher {
private StandaloneSchemaFetcher() {}
@@ -41,4 +43,10 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
return null;
}
+
+ @Override
+ public SchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath, List<String[]> measurements, List<TSDataType[]> tsDataTypes) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
index 1bbd2b59fe..33d24c1974 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
@@ -50,7 +50,9 @@ import org.antlr.v4.runtime.atn.PredictionMode;
import org.antlr.v4.runtime.tree.ParseTree;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
@@ -148,13 +150,27 @@ public class StatementGenerator {
insertStatement.setDevicePath(new PartialPath(insertRecordReq.getPrefixPath()));
insertStatement.setTime(insertRecordReq.getTimestamp());
- // insertStatement.setValuesList(insertRecordReq.getValues());
insertStatement.fillValues(insertRecordReq.values);
insertStatement.setMeasurements(insertRecordReq.getMeasurements().toArray(new String[0]));
insertStatement.setAligned(insertRecordReq.isAligned);
return insertStatement;
}
+ public static Statement createStatement(TSInsertStringRecordReq insertRecordReq)
+ throws IllegalPathException, QueryProcessException {
+ // construct insert statement
+ InsertRowStatement insertStatement = new InsertRowStatement();
+ insertStatement.setDevicePath(new PartialPath(insertRecordReq.getPrefixPath()));
+ insertStatement.setTime(insertRecordReq.getTimestamp());
+ insertStatement.setMeasurements(insertRecordReq.getMeasurements().toArray(new String[0]));
+ insertStatement.setDataTypes(new TSDataType[insertStatement.getMeasurements().length]);
+ insertStatement.setValues(insertRecordReq.getValues().toArray(new Object[0]));
+ insertStatement.setNeedInferType(true);
+ insertStatement.setAligned(insertRecordReq.isAligned);
+
+ return insertStatement;
+ }
+
public static Statement createStatement(TSInsertTabletReq insertTabletReq)
throws IllegalPathException {
// construct insert statement
@@ -182,6 +198,121 @@ public class StatementGenerator {
return insertStatement;
}
+ public static Statement createStatement(TSInsertTabletsReq req) throws IllegalPathException {
+ // construct insert statement
+ InsertMultiTabletsStatement insertStatement = new InsertMultiTabletsStatement();
+ List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+ for (int i = 0; i < req.prefixPaths.size(); i++) {
+ InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
+ insertTabletStatement.setDevicePath(new PartialPath(req.prefixPaths.get(i)));
+ insertTabletStatement.setMeasurements(req.measurementsList.get(i).toArray(new String[0]));
+ insertTabletStatement.setTimes(
+ QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
+ insertTabletStatement.setColumns(
+ QueryDataSetUtils.readValuesFromBuffer(
+ req.valuesList.get(i),
+ req.typesList.get(i),
+ req.measurementsList.get(i).size(),
+ req.sizeList.get(i)));
+ insertTabletStatement.setBitMaps(
+ QueryDataSetUtils.readBitMapsFromBuffer(
+ req.valuesList.get(i), req.measurementsList.get(i).size(), req.sizeList.get(i)));
+ insertTabletStatement.setRowCount(req.sizeList.get(i));
+ TSDataType[] dataTypes = new TSDataType[req.typesList.get(i).size()];
+ for (int j = 0; j < dataTypes.length; j++) {
+ dataTypes[j] = TSDataType.values()[req.typesList.get(i).get(j)];
+ }
+ insertTabletStatement.setDataTypes(dataTypes);
+ insertTabletStatement.setAligned(req.isAligned);
+
+ insertTabletStatementList.add(insertTabletStatement);
+ }
+
+ insertStatement.setInsertTabletStatementList(insertTabletStatementList);
+ return insertStatement;
+ }
+
+ public static Statement createStatement(TSInsertRecordsReq req)
+ throws IllegalPathException, QueryProcessException {
+ // construct insert statement
+ InsertRowsStatement insertStatement = new InsertRowsStatement();
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (int i = 0; i < req.prefixPaths.size(); i++) {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(new PartialPath(req.getPrefixPaths().get(i)));
+ statement.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
+ statement.setTime(req.getTimestamps().get(i));
+ statement.fillValues(req.valuesList.get(i));
+ statement.setAligned(req.isAligned);
+ insertRowStatementList.add(statement);
+ }
+ insertStatement.setInsertRowStatementList(insertRowStatementList);
+ return insertStatement;
+ }
+
+ public static Statement createStatement(TSInsertStringRecordsReq req)
+ throws IllegalPathException, QueryProcessException {
+ // construct insert statement
+ InsertRowsStatement insertStatement = new InsertRowsStatement();
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (int i = 0; i < req.prefixPaths.size(); i++) {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(new PartialPath(req.getPrefixPaths().get(i)));
+ addMeasurementAndValue(
+ statement, req.getMeasurementsList().get(i), req.getValuesList().get(i));
+ statement.setDataTypes(new TSDataType[statement.getMeasurements().length]);
+ statement.setTime(req.getTimestamps().get(i));
+ statement.setNeedInferType(true);
+ statement.setAligned(req.isAligned);
+
+ insertRowStatementList.add(statement);
+ }
+ insertStatement.setInsertRowStatementList(insertRowStatementList);
+ return insertStatement;
+ }
+
+ public static Statement createStatement(TSInsertRecordsOfOneDeviceReq req)
+ throws IllegalPathException, QueryProcessException {
+ // construct insert statement
+ InsertRowsOfOneDeviceStatement insertStatement = new InsertRowsOfOneDeviceStatement();
+ insertStatement.setDevicePath(new PartialPath(req.prefixPath));
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (int i = 0; i < req.timestamps.size(); i++) {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(insertStatement.getDevicePath());
+ statement.setMeasurements(req.measurementsList.get(i).toArray(new String[0]));
+ statement.setTime(req.timestamps.get(i));
+ statement.fillValues(req.valuesList.get(i));
+ statement.setAligned(req.isAligned);
+
+ insertRowStatementList.add(statement);
+ }
+ insertStatement.setInsertRowStatementList(insertRowStatementList);
+ return insertStatement;
+ }
+
+ public static Statement createStatement(TSInsertStringRecordsOfOneDeviceReq req)
+ throws IllegalPathException, QueryProcessException {
+ // construct insert statement
+ InsertRowsOfOneDeviceStatement insertStatement = new InsertRowsOfOneDeviceStatement();
+ insertStatement.setDevicePath(new PartialPath(req.prefixPath));
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (int i = 0; i < req.timestamps.size(); i++) {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(insertStatement.getDevicePath());
+ addMeasurementAndValue(
+ statement, req.getMeasurementsList().get(i), req.getValuesList().get(i));
+ statement.setDataTypes(new TSDataType[statement.getMeasurements().length]);
+ statement.setTime(req.timestamps.get(i));
+ statement.setNeedInferType(true);
+ statement.setAligned(req.isAligned);
+
+ insertRowStatementList.add(statement);
+ }
+ insertStatement.setInsertRowStatementList(insertRowStatementList);
+ return insertStatement;
+ }
+
public static Statement createStatement(TSCreateTimeseriesReq req) throws IllegalPathException {
// construct create timseries statement
CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
@@ -242,4 +373,22 @@ public class StatementGenerator {
}
return astVisitor.visit(tree);
}
+
+ private static void addMeasurementAndValue(
+ InsertRowStatement insertRowStatement, List<String> measurements, List<String> values) {
+ List<String> newMeasurements = new ArrayList<>(measurements.size());
+ List<Object> newValues = new ArrayList<>(values.size());
+
+ for (int i = 0; i < measurements.size(); ++i) {
+ String value = values.get(i);
+ if (value.isEmpty()) {
+ continue;
+ }
+ newMeasurements.add(measurements.get(i));
+ newValues.add(value);
+ }
+
+ insertRowStatement.setValues(newValues.toArray(new Object[0]));
+ insertRowStatement.setMeasurements(newMeasurements.toArray(new String[0]));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index ae6e779fb4..2db6e48a2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -38,7 +38,9 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
@@ -46,6 +48,7 @@ import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
+import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
@@ -462,6 +465,97 @@ public class LogicalPlanner {
return null;
}
}
+
+ @Override
+ public PlanNode visitInsertRows(
+ InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+ // set schema in insert node
+ // convert insert statement to insert node
+ InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+ for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
+ InsertRowStatement insertRowStatement =
+ insertRowsStatement.getInsertRowStatementList().get(i);
+ List<MeasurementSchema> measurementSchemas =
+ analysis
+ .getSchemaTree()
+ .searchMeasurementSchema(
+ insertRowStatement.getDevicePath(),
+ Arrays.asList(insertRowStatement.getMeasurements()));
+ insertRowsNode.addOneInsertRowNode(
+ new InsertRowNode(
+ insertRowsNode.getPlanNodeId(),
+ insertRowStatement.getDevicePath(),
+ insertRowStatement.isAligned(),
+ measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertRowStatement.getDataTypes(),
+ insertRowStatement.getTime(),
+ insertRowStatement.getValues()),
+ i);
+ }
+ return insertRowsNode;
+ }
+
+ @Override
+ public PlanNode visitInsertMultiTablets(
+ InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+ // set schema in insert node
+ // convert insert statement to insert node
+ InsertMultiTabletsNode insertMultiTabletsNode =
+ new InsertMultiTabletsNode(context.getQueryId().genPlanNodeId());
+ List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
+ for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) {
+ InsertTabletStatement insertTabletStatement =
+ insertMultiTabletsStatement.getInsertTabletStatementList().get(i);
+ List<MeasurementSchema> measurementSchemas =
+ analysis
+ .getSchemaTree()
+ .searchMeasurementSchema(
+ insertTabletStatement.getDevicePath(),
+ Arrays.asList(insertTabletStatement.getMeasurements()));
+ insertTabletNodeList.add(
+ new InsertTabletNode(
+ insertMultiTabletsNode.getPlanNodeId(),
+ insertTabletStatement.getDevicePath(),
+ insertTabletStatement.isAligned(),
+ measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertTabletStatement.getDataTypes(),
+ insertTabletStatement.getTimes(),
+ insertTabletStatement.getBitMaps(),
+ insertTabletStatement.getColumns(),
+ insertTabletStatement.getRowCount()));
+ }
+ insertMultiTabletsNode.setInsertTabletNodeList(insertTabletNodeList);
+ return insertMultiTabletsNode;
+ }
+
+ @Override
+ public PlanNode visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+ // set schema in insert node
+ // convert insert statement to insert node
+ InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+ for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
+ InsertRowStatement insertRowStatement =
+ insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
+ List<MeasurementSchema> measurementSchemas =
+ analysis
+ .getSchemaTree()
+ .searchMeasurementSchema(
+ insertRowStatement.getDevicePath(),
+ Arrays.asList(insertRowStatement.getMeasurements()));
+ insertRowsNode.addOneInsertRowNode(
+ new InsertRowNode(
+ insertRowsNode.getPlanNodeId(),
+ insertRowStatement.getDevicePath(),
+ insertRowStatement.isAligned(),
+ measurementSchemas.toArray(new MeasurementSchema[0]),
+ insertRowStatement.getDataTypes(),
+ insertRowStatement.getTime(),
+ insertRowStatement.getValues()),
+ i);
+ }
+ return insertRowsNode;
+ }
}
private class PlanBuilder {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 27f8c3d168..b331542334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -38,11 +38,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
import java.nio.ByteBuffer;
@@ -120,7 +117,7 @@ public enum PlanNodeType {
case 16:
return InsertRowsOfOneDeviceNode.deserialize(buffer);
case 17:
- return InsertMultiTabletNode.deserialize(buffer);
+ return InsertMultiTabletsNode.deserialize(buffer);
case 18:
return ShowDevicesNode.deserialize(buffer);
case 19:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
deleted file mode 100644
index 48bd2f80c5..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
-
-import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class InsertMultiTabletNode extends InsertNode {
-
- protected InsertMultiTabletNode(PlanNodeId id) {
- super(id);
- }
-
- @Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
- return null;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
- @Override
- public PlanNode clone() {
- throw new NotImplementedException("clone of Insert is not implemented");
- }
-
- @Override
- public int allowedChildCount() {
- return NO_CHILD_ALLOWED;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
- public static InsertMultiTabletNode deserialize(ByteBuffer byteBuffer) {
- return null;
- }
-
- @Override
- public void serialize(ByteBuffer byteBuffer) {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
new file mode 100644
index 0000000000..f646cc9760
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class InsertMultiTabletsNode extends InsertNode {
+
+ /**
+ * the value is used to indict the parent InsertTabletNode's index when the parent
+ * InsertTabletNode is split to multi sub InsertTabletNodes. if the InsertTabletNode have no
+ * parent plan, the value is zero;
+ *
+ * <p>suppose we originally have three InsertTabletNodes in one InsertMultiTabletsNode, then the
+ * initial InsertMultiTabletsNode would have the following two attributes:
+ *
+ * <p>insertTabletNodeList={InsertTabletNode_1,InsertTabletNode_2,InsertTabletNode_3}
+ *
+ * <p>parentInsetTablePlanIndexList={0,0,0} both have three values.
+ *
+ * <p>if the InsertTabletNode_1 is split into two sub InsertTabletNodes, InsertTabletNode_2 is
+ * split into three sub InsertTabletNodes, InsertTabletNode_3 is split into four sub
+ * InsertTabletNodes.
+ *
+ * <p>InsertTabletNode_1={InsertTabletNode_1_subPlan1, InsertTabletNode_1_subPlan2}
+ *
+ * <p>InsertTabletNode_2={InsertTabletNode_2_subPlan1, InsertTabletNode_2_subPlan2,
+ * InsertTabletNode_2_subPlan3}
+ *
+ * <p>InsertTabletNode_3={InsertTabletNode_3_subPlan1, InsertTabletNode_3_subPlan2,
+ * InsertTabletNode_3_subPlan3, InsertTabletNode_3_subPlan4}
+ *
+ * <p>those sub plans belong to two different raft data groups, so will generate two new
+ * InsertMultiTabletNodes
+ *
+ * <p>InsertMultiTabletNodet1.insertTabletNodeList={InsertTabletNode_1_subPlan1,
+ * InsertTabletNode_3_subPlan1, InsertTabletNode_3_subPlan3, InsertTabletNode_3_subPlan4}
+ *
+ * <p>InsertMultiTabletNodet1.parentInsetTablePlanIndexList={0,2,2,2}
+ *
+ * <p>InsertMultiTabletNodet2.insertTabletNodeList={InsertTabletNode_1_subPlan2,
+ * InsertTabletNode_2_subPlan1, InsertTabletNode_2_subPlan2, InsertTabletNode_2_subPlan3,
+ * InsertTabletNode_3_subPlan2}
+ *
+ * <p>InsertMultiTabletNodet2.parentInsetTablePlanIndexList={0,1,1,1,2}
+ *
+ * <p>this is usually used to back-propagate exceptions to the parent plan without losing their
+ * proper positions.
+ */
+ List<Integer> parentInsertTabletNodeIndexList;
+
+ /** the InsertTabletNode list */
+ List<InsertTabletNode> insertTabletNodeList;
+
+ /** record the result of insert tablets */
+ private Map<Integer, TSStatus> results = new HashMap<>();
+
+ public InsertMultiTabletsNode(PlanNodeId id) {
+ super(id);
+ parentInsertTabletNodeIndexList = new ArrayList<>();
+ insertTabletNodeList = new ArrayList<>();
+ }
+
+ public List<Integer> getParentInsertTabletNodeIndexList() {
+ return parentInsertTabletNodeIndexList;
+ }
+
+ public void setParentInsertTabletNodeIndexList(List<Integer> parentInsertTabletNodeIndexList) {
+ this.parentInsertTabletNodeIndexList = parentInsertTabletNodeIndexList;
+ }
+
+ public List<InsertTabletNode> getInsertTabletNodeList() {
+ return insertTabletNodeList;
+ }
+
+ public void setInsertTabletNodeList(List<InsertTabletNode> insertTabletNodeList) {
+ this.insertTabletNodeList = insertTabletNodeList;
+ }
+
+ public void addInsertTabletNode(InsertTabletNode node, Integer parentIndex) {
+ insertTabletNodeList.add(node);
+ parentInsertTabletNodeIndexList.add(parentIndex);
+ }
+
+ @Override
+ public List<InsertNode> splitByPartition(Analysis analysis) {
+ Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
+ for (int i = 0; i < insertTabletNodeList.size(); i++) {
+ InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
+ List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
+ for (InsertNode subNode : tmpResult) {
+ RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
+ if (splitMap.containsKey(dataRegionReplicaSet)) {
+ InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
+ tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
+ } else {
+ InsertMultiTabletsNode tmpNode = new InsertMultiTabletsNode(this.getPlanNodeId());
+ tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+ tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
+ splitMap.put(dataRegionReplicaSet, tmpNode);
+ }
+ }
+ }
+ return new ArrayList<>(splitMap.values());
+ }
+
+ public Map<Integer, TSStatus> getResults() {
+ return results;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ throw new NotImplementedException("clone of Insert is not implemented");
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
+ return null;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 1f3b6c1ab1..92ce7684c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -18,18 +18,69 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class InsertRowsNode extends InsertNode {
- protected InsertRowsNode(PlanNodeId id) {
+ /**
+ * Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
+ * insertRowNodeList={InsertRowNode_0, InsertRowNode_1, InsertRowNode_2, InsertRowNode_3,
+ * InsertRowNode_4}, then the insertRowNodeIndexList={0, 1, 2, 3, 4} respectively. But when the
+ * InsertRowsNode is split into two InsertRowsNodes according to different storage group in
+ * cluster version, suppose that the InsertRowsNode_1's insertRowNodeList = {InsertRowNode_0,
+ * InsertRowNode_3, InsertRowNode_4}, then InsertRowsNode_1's insertRowNodeIndexList = {0, 3, 4};
+ * InsertRowsNode_2's insertRowNodeList = {InsertRowNode_1, * InsertRowNode_2} then
+ * InsertRowsNode_2's insertRowNodeIndexList= {1, 2} respectively;
+ */
+ private List<Integer> insertRowNodeIndexList;
+
+ /** the InsertRowsNode list */
+ private List<InsertRowNode> insertRowNodeList;
+
+ public InsertRowsNode(PlanNodeId id) {
super(id);
+ insertRowNodeList = new ArrayList<>();
+ insertRowNodeIndexList = new ArrayList<>();
+ }
+
+ /** record the result of insert rows */
+ private Map<Integer, TSStatus> results = new HashMap<>();
+
+ public List<Integer> getInsertRowNodeIndexList() {
+ return insertRowNodeIndexList;
+ }
+
+ public void setInsertRowNodeIndexList(List<Integer> insertRowNodeIndexList) {
+ this.insertRowNodeIndexList = insertRowNodeIndexList;
+ }
+
+ public List<InsertRowNode> getInsertRowNodeList() {
+ return insertRowNodeList;
+ }
+
+ public void setInsertRowNodeList(List<InsertRowNode> insertRowNodeList) {
+ this.insertRowNodeList = insertRowNodeList;
+ }
+
+ public void addOneInsertRowNode(InsertRowNode node, int index) {
+ insertRowNodeList.add(node);
+ insertRowNodeIndexList.add(index);
+ }
+
+ public Map<Integer, TSStatus> getResults() {
+ return results;
}
@Override
@@ -64,6 +115,27 @@ public class InsertRowsNode extends InsertNode {
@Override
public List<InsertNode> splitByPartition(Analysis analysis) {
- return null;
+ Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
+ for (int i = 0; i < insertRowNodeList.size(); i++) {
+ InsertRowNode insertRowNode = insertRowNodeList.get(i);
+ // data region for insert row node
+ RegionReplicaSet dataRegionReplicaSet =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(
+ insertRowNode.devicePath.getFullPath(),
+ StorageEngine.getTimePartitionSlot(insertRowNode.getTime()));
+ if (splitMap.containsKey(dataRegionReplicaSet)) {
+ InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
+ tmpNode.addOneInsertRowNode(insertRowNode, i);
+ } else {
+ InsertRowsNode tmpNode = new InsertRowsNode(this.getPlanNodeId());
+ tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+ tmpNode.addOneInsertRowNode(insertRowNode, i);
+ splitMap.put(dataRegionReplicaSet, tmpNode);
+ }
+ }
+
+ return new ArrayList<>(splitMap.values());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 0c8d78fc7b..e9adb763dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -18,18 +18,65 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class InsertRowsOfOneDeviceNode extends InsertNode {
- protected InsertRowsOfOneDeviceNode(PlanNodeId id) {
+ /**
+ * Suppose there is an InsertRowsOfOneDeviceNode, which contains 5 InsertRowNodes,
+ * insertRowNodeList={InsertRowNode_0, InsertRowNode_1, InsertRowNode_2, InsertRowNode_3,
+ * InsertRowNode_4}, then the insertRowNodeIndexList={0, 1, 2, 3, 4} respectively. But when the
+ * InsertRowsOfOneDeviceNode is split into two InsertRowsOfOneDeviceNodes according to different
+ * storage group in cluster version, suppose that the InsertRowsOfOneDeviceNode_1's
+ * insertRowNodeList = {InsertRowNode_0, InsertRowNode_3, InsertRowNode_4}, then
+ * InsertRowsOfOneDeviceNode_1's insertRowNodeIndexList = {0, 3, 4}; InsertRowsOfOneDeviceNode_2's
+ * insertRowNodeList = {InsertRowNode_1, * InsertRowNode_2} then InsertRowsOfOneDeviceNode_2's
+ * insertRowNodeIndexList= {1, 2} respectively;
+ */
+ private List<Integer> insertRowNodeIndexList;
+
+ /** the InsertRowsNode list */
+ private List<InsertRowNode> insertRowNodeList;
+
+ /** record the result of insert rows */
+ private Map<Integer, TSStatus> results = new HashMap<>();
+
+ public InsertRowsOfOneDeviceNode(PlanNodeId id) {
super(id);
+ insertRowNodeIndexList = new ArrayList<>();
+ insertRowNodeList = new ArrayList<>();
+ }
+
+ public Map<Integer, TSStatus> getResults() {
+ return results;
+ }
+
+ public List<Integer> getInsertRowNodeIndexList() {
+ return insertRowNodeIndexList;
+ }
+
+ public void setInsertRowNodeIndexList(List<Integer> insertRowNodeIndexList) {
+ this.insertRowNodeIndexList = insertRowNodeIndexList;
+ }
+
+ public List<InsertRowNode> getInsertRowNodeList() {
+ return insertRowNodeList;
+ }
+
+ public void setInsertRowNodeList(List<InsertRowNode> insertRowNodeList) {
+ this.insertRowNodeList = insertRowNodeList;
}
@Override
@@ -64,6 +111,27 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
@Override
public List<InsertNode> splitByPartition(Analysis analysis) {
- return null;
+ Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
+ for (int i = 0; i < insertRowNodeList.size(); i++) {
+ InsertRowNode insertRowNode = insertRowNodeList.get(i);
+ // data region for insert row node
+ RegionReplicaSet dataRegionReplicaSet =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(
+ devicePath.getFullPath(),
+ StorageEngine.getTimePartitionSlot(insertRowNode.getTime()));
+ if (splitMap.containsKey(dataRegionReplicaSet)) {
+ InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
+ tmpNode.addOneInsertRowNode(insertRowNode, i);
+ } else {
+ InsertRowsNode tmpNode = new InsertRowsNode(this.getPlanNodeId());
+ tmpNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+ tmpNode.addOneInsertRowNode(insertRowNode, i);
+ splitMap.put(dataRegionReplicaSet, tmpNode);
+ }
+ }
+
+ return new ArrayList<>(splitMap.values());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 9cd14429cc..69a52ef24d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -19,10 +19,7 @@
package org.apache.iotdb.db.mpp.sql.statement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
@@ -185,4 +182,18 @@ public abstract class StatementVisitor<R, C> {
public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
return visitStatement(insertRowStatement, context);
}
+
+ public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) {
+ return visitStatement(insertRowsStatement, context);
+ }
+
+ public R visitInsertMultiTablets(
+ InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
+ return visitStatement(insertMultiTabletsStatement, context);
+ }
+
+ public R visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C context) {
+ return visitStatement(insertRowsOfOneDeviceStatement, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index 271a70de0f..ab54768a17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.sql.statement.crud;
-import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
@@ -80,8 +79,6 @@ public abstract class InsertBaseStatement extends Statement {
return failedMeasurements;
}
- public abstract List<TimePartitionSlot> getTimePartitionSlots();
-
public abstract boolean checkDataType(SchemaTree schemaTree);
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
new file mode 100644
index 0000000000..c2d2d910dd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InsertMultiTabletsStatement extends InsertBaseStatement {
+
+ /** the InsertTabletStatement list */
+ List<InsertTabletStatement> insertTabletStatementList;
+
+ public List<InsertTabletStatement> getInsertTabletStatementList() {
+ return insertTabletStatementList;
+ }
+
+ public void setInsertTabletStatementList(List<InsertTabletStatement> insertTabletStatementList) {
+ this.insertTabletStatementList = insertTabletStatementList;
+ }
+
+ public List<PartialPath> getDevicePaths() {
+ List<PartialPath> partialPaths = new ArrayList<>();
+ for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
+ partialPaths.add(insertTabletStatement.devicePath);
+ }
+ return partialPaths;
+ }
+
+ public List<String[]> getMeasurementsList() {
+ List<String[]> measurementsList = new ArrayList<>();
+ for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
+ measurementsList.add(insertTabletStatement.measurements);
+ }
+ return measurementsList;
+ }
+
+ public List<TSDataType[]> getDataTypesList() {
+ List<TSDataType[]> dataTypesList = new ArrayList<>();
+ for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
+ dataTypesList.add(insertTabletStatement.dataTypes);
+ }
+ return dataTypesList;
+ }
+
+ @Override
+ public boolean checkDataType(SchemaTree schemaTree) {
+ for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
+ if (!insertTabletStatement.checkDataType(schemaTree)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertMultiTablets(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
index 1c6f3e38e5..6e751bc52e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
@@ -174,7 +174,6 @@ public class InsertRowStatement extends InsertBaseStatement {
dataTypes[index] = null;
}
- @Override
public List<TimePartitionSlot> getTimePartitionSlots() {
return Collections.singletonList(StorageEngine.getTimePartitionSlot(time));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
new file mode 100644
index 0000000000..ae18e4364d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
+
+ /** the InsertRowsStatement list */
+ private List<InsertRowStatement> insertRowStatementList;
+
+ public List<InsertRowStatement> getInsertRowStatementList() {
+ return insertRowStatementList;
+ }
+
+ public void setInsertRowStatementList(List<InsertRowStatement> insertRowStatementList) {
+ this.insertRowStatementList = insertRowStatementList;
+
+ // set device path, measurements, and data types
+ if (insertRowStatementList == null || insertRowStatementList.size() == 0) {
+ return;
+ }
+ devicePath = insertRowStatementList.get(0).getDevicePath();
+ Map<String, TSDataType> measurementsAndDataType = new HashMap<>();
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ List<String> measurements = Arrays.asList(insertRowStatement.getMeasurements());
+ Map<String, TSDataType> subMap =
+ measurements.stream()
+ .collect(
+ Collectors.toMap(
+ key -> key, key -> insertRowStatement.dataTypes[measurements.indexOf(key)]));
+ measurementsAndDataType.putAll(subMap);
+ }
+ measurements = measurementsAndDataType.keySet().toArray(new String[0]);
+ dataTypes = measurementsAndDataType.values().toArray(new TSDataType[0]);
+ }
+
+ public List<TimePartitionSlot> getTimePartitionSlots() {
+ Set<TimePartitionSlot> timePartitionSlotSet = new HashSet<>();
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ timePartitionSlotSet.add(StorageEngine.getTimePartitionSlot(insertRowStatement.getTime()));
+ }
+ return new ArrayList<>(timePartitionSlotSet);
+ }
+
+ @Override
+ public boolean checkDataType(SchemaTree schemaTree) {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ if (!insertRowStatement.checkDataType(schemaTree)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void transferType(SchemaTree schemaTree) throws QueryProcessException {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ insertRowStatement.transferType(schemaTree);
+ }
+ }
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertRowsOfOneDevice(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
new file mode 100644
index 0000000000..353d736a97
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InsertRowsStatement extends InsertBaseStatement {
+
+ /** the InsertRowsStatement list */
+ private List<InsertRowStatement> insertRowStatementList;
+
+ public List<PartialPath> getDevicePaths() {
+ List<PartialPath> partialPaths = new ArrayList<>();
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ partialPaths.add(insertRowStatement.devicePath);
+ }
+ return partialPaths;
+ }
+
+ public List<String[]> getMeasurementsList() {
+ List<String[]> measurementsList = new ArrayList<>();
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ measurementsList.add(insertRowStatement.measurements);
+ }
+ return measurementsList;
+ }
+
+ public List<TSDataType[]> getDataTypesList() {
+ List<TSDataType[]> dataTypesList = new ArrayList<>();
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ dataTypesList.add(insertRowStatement.dataTypes);
+ }
+ return dataTypesList;
+ }
+
+ public List<InsertRowStatement> getInsertRowStatementList() {
+ return insertRowStatementList;
+ }
+
+ public void setInsertRowStatementList(List<InsertRowStatement> insertRowStatementList) {
+ this.insertRowStatementList = insertRowStatementList;
+ }
+
+ @Override
+ public boolean checkDataType(SchemaTree schemaTree) {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ if (!insertRowStatement.checkDataType(schemaTree)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void transferType(SchemaTree schemaTree) throws QueryProcessException {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ insertRowStatement.transferType(schemaTree);
+ }
+ }
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertRows(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index b6d3a46cce..de87ec94c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -81,7 +82,6 @@ public class InsertTabletStatement extends InsertBaseStatement {
columns[index] = null;
}
- @Override
public List<TimePartitionSlot> getTimePartitionSlots() {
List<TimePartitionSlot> result = new ArrayList<>();
long startTime =
@@ -121,4 +121,8 @@ public class InsertTabletStatement extends InsertBaseStatement {
}
return true;
}
+
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertTablet(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 8b721ded67..c0476c794e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
@@ -130,5 +130,5 @@ public interface IPlanExecutor {
*
* @throws QueryProcessException when some of the rows failed
*/
- void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan) throws QueryProcessException;
+ void insertTablet(InsertMultiTabletsPlan insertMultiTabletsPlan) throws QueryProcessException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 3a480afd09..eb6a47f4f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -61,25 +61,8 @@ import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
-import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
@@ -281,7 +264,7 @@ public class PlanExecutor implements IPlanExecutor {
insertTablet((InsertTabletPlan) plan);
return true;
case MULTI_BATCH_INSERT:
- insertTablet((InsertMultiTabletPlan) plan);
+ insertTablet((InsertMultiTabletsPlan) plan);
return true;
case CREATE_ROLE:
case DELETE_ROLE:
@@ -1649,48 +1632,48 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
- public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
+ public void insertTablet(InsertMultiTabletsPlan insertMultiTabletsPlan)
throws QueryProcessException {
- if (insertMultiTabletPlan.isEnableMultiThreading()) {
- insertTabletParallel(insertMultiTabletPlan);
+ if (insertMultiTabletsPlan.isEnableMultiThreading()) {
+ insertTabletParallel(insertMultiTabletsPlan);
} else {
- insertTabletSerial(insertMultiTabletPlan);
+ insertTabletSerial(insertMultiTabletsPlan);
}
}
- private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+ private void insertTabletSerial(InsertMultiTabletsPlan insertMultiTabletsPlan)
throws BatchProcessException {
- for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
- if (insertMultiTabletPlan.getResults().containsKey(i)
- || insertMultiTabletPlan.isExecuted(i)) {
+ for (int i = 0; i < insertMultiTabletsPlan.getInsertTabletPlanList().size(); i++) {
+ if (insertMultiTabletsPlan.getResults().containsKey(i)
+ || insertMultiTabletsPlan.isExecuted(i)) {
continue;
}
try {
- insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ insertTablet(insertMultiTabletsPlan.getInsertTabletPlanList().get(i));
} catch (QueryProcessException e) {
- insertMultiTabletPlan
+ insertMultiTabletsPlan
.getResults()
.put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
- if (!insertMultiTabletPlan.getResults().isEmpty()) {
- throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ if (!insertMultiTabletsPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletsPlan.getFailingStatus());
}
}
- private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+ private void insertTabletParallel(InsertMultiTabletsPlan insertMultiTabletsPlan)
throws BatchProcessException {
- updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+ updateInsertTabletsPool(insertMultiTabletsPlan.getDifferentStorageGroupsCount());
- List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+ List<InsertTabletPlan> planList = insertMultiTabletsPlan.getInsertTabletPlanList();
List<Future<?>> futureList = new ArrayList<>();
- Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+ Map<Integer, TSStatus> results = insertMultiTabletsPlan.getResults();
List<InsertTabletPlan> runPlanList = new ArrayList<>();
Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
for (int i = 0; i < planList.size(); i++) {
- if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+ if (!(results.containsKey(i) || insertMultiTabletsPlan.isExecuted(i))) {
runPlanList.add(planList.get(i));
runIndexToRealIndex.put(runPlanList.size() - 1, i);
}
@@ -1721,7 +1704,7 @@ public class PlanExecutor implements IPlanExecutor {
}
if (!results.isEmpty()) {
- throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ throw new BatchProcessException(insertMultiTabletsPlan.getFailingStatus());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 7e8430bfbe..c825aa35fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -24,13 +24,8 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
@@ -320,7 +315,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
plan = new InsertTabletPlan();
break;
case MULTI_BATCH_INSERT:
- plan = new InsertMultiTabletPlan();
+ plan = new InsertMultiTabletsPlan();
break;
case DELETE:
plan = new DeletePlan();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletsPlan.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
rename to server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletsPlan.java
index 23f435a501..7d0e431338 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletsPlan.java
@@ -41,19 +41,19 @@ import java.util.TreeMap;
/**
* Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
- * replication group, we merge these InsertTabletPlans into one InsertMultiTabletPlan, which can
+ * replication group, we merge these InsertTabletPlans into one InsertMultiTabletsPlan, which can
* reduce the number of raft logs. For details, please refer to
* https://issues.apache.org/jira/browse/IOTDB-1099
*/
-public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
+public class InsertMultiTabletsPlan extends InsertPlan implements BatchPlan {
/**
* the value is used to indict the parent InsertTabletPlan's index when the parent
* InsertTabletPlan is split to multi sub InsertTabletPlans. if the InsertTabletPlan have no
* parent plan, the value is zero;
*
- * <p>suppose we originally have three InsertTabletPlans in one InsertMultiTabletPlan, then the
- * initial InsertMultiTabletPlan would have the following two attributes:
+ * <p>suppose we originally have three InsertTabletPlans in one InsertMultiTabletsPlan, then the
+ * initial InsertMultiTabletsPlan would have the following two attributes:
*
* <p>insertTabletPlanList={InsertTabletPlan_1,InsertTabletPlan_2,InsertTabletPlan_3}
*
@@ -104,19 +104,19 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
Integer differentStorageGroupsCount;
- public InsertMultiTabletPlan() {
+ public InsertMultiTabletsPlan() {
super(OperatorType.MULTI_BATCH_INSERT);
this.insertTabletPlanList = new ArrayList<>();
this.parentInsertTabletPlanIndexList = new ArrayList<>();
}
- public InsertMultiTabletPlan(List<InsertTabletPlan> insertTabletPlanList) {
+ public InsertMultiTabletsPlan(List<InsertTabletPlan> insertTabletPlanList) {
super(OperatorType.MULTI_BATCH_INSERT);
this.insertTabletPlanList = insertTabletPlanList;
this.parentInsertTabletPlanIndexList = new ArrayList<>();
}
- public InsertMultiTabletPlan(
+ public InsertMultiTabletsPlan(
List<InsertTabletPlan> insertTabletPlanList, List<Integer> parentInsertTabletPlanIndexList) {
super(OperatorType.MULTI_BATCH_INSERT);
this.insertTabletPlanList = insertTabletPlanList;
@@ -214,8 +214,8 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
}
/**
- * @param index the index of the sub plan in this InsertMultiTabletPlan
- * @return the parent's index in the parent InsertMultiTabletPlan
+ * @param index the index of the sub plan in this InsertMultiTabletsPlan
+ * @return the parent's index in the parent InsertMultiTabletsPlan
*/
public int getParentIndex(int index) {
if (index >= parentInsertTabletPlanIndexList.size() || index < 0) {
@@ -316,14 +316,14 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
public void setIndex(long index) {
super.setIndex(index);
for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
- // use the InsertMultiTabletPlan's index as the sub InsertTabletPlan's index
+ // use the InsertMultiTabletsPlan's index as the sub InsertTabletPlan's index
insertTabletPlan.setIndex(index);
}
}
@Override
public String toString() {
- return "InsertMultiTabletPlan{"
+ return "InsertMultiTabletsPlan{"
+ " insertTabletPlanList="
+ insertTabletPlanList
+ ", parentInsetTablePlanIndexList="
@@ -340,7 +340,7 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
return false;
}
- InsertMultiTabletPlan that = (InsertMultiTabletPlan) o;
+ InsertMultiTabletsPlan that = (InsertMultiTabletsPlan) o;
if (!Objects.equals(insertTabletPlanList, that.insertTabletPlanList)) {
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3472c29ebf..fdd6b1af87 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -40,19 +40,11 @@ import org.apache.iotdb.db.mpp.execution.Coordinator;
import org.apache.iotdb.db.mpp.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.*;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
-import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
@@ -982,19 +974,19 @@ public class TSServiceImpl implements TSIService.Iface {
private TSStatus insertTabletsInternally(
List<InsertTabletPlan> insertTabletPlans, long sessionId) {
- InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
+ InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
for (int i = 0; i < insertTabletPlans.size(); i++) {
InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i);
TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, sessionId);
if (status != null) {
// not authorized
- insertMultiTabletPlan.getResults().put(i, status);
+ insertMultiTabletsPlan.getResults().put(i, status);
}
}
- insertMultiTabletPlan.setInsertTabletPlanList(insertTabletPlans);
+ insertMultiTabletsPlan.setInsertTabletPlanList(insertTabletPlans);
- return executeNonQueryPlan(insertMultiTabletPlan);
+ return executeNonQueryPlan(insertMultiTabletsPlan);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@@ -1236,6 +1228,46 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
}
+ public TSStatus insertRecordsV2(TSInsertRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPaths.get(0),
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
private TSStatus judgeFinalTsStatus(
boolean allCheckSuccess,
TSStatus executeTsStatus,
@@ -1303,6 +1335,47 @@ public class TSServiceImpl implements TSIService.Iface {
return resp;
}
+ public TSStatus insertRecordsOfOneDeviceV2(TSInsertRecordsOfOneDeviceReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPath,
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsOfOneDeviceStatement statement =
+ (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
@Override
public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1361,6 +1434,47 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.timestamps.size());
}
+ public TSStatus insertStringRecordsOfOneDeviceV2(TSInsertStringRecordsOfOneDeviceReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPath,
+ req.getTimestamps().get(0));
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertRowsOfOneDeviceStatement statement =
+ (InsertRowsOfOneDeviceStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
@Override
public TSStatus insertStringRecords(TSInsertStringRecordsReq req) {
if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
@@ -1414,6 +1528,44 @@ public class TSServiceImpl implements TSIService.Iface {
allCheckSuccess, tsStatus, insertRowsPlan.getResults(), req.prefixPaths.size());
}
+ public TSStatus insertStringRecordsV2(TSInsertStringRecordsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecords, first device {}, first time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.prefixPaths.get(0),
+ req.getTimestamps().get(0));
+ }
+
+ InsertRowsStatement statement = (InsertRowsStatement) StatementGenerator.createStatement(req);
+
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
private void addMeasurementAndValue(
InsertRowPlan insertRowPlan, List<String> measurements, List<String> values) {
List<String> newMeasurements = new ArrayList<>(measurements.size());
@@ -1572,6 +1724,43 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ public TSStatus insertStringRecordV2(TSInsertStringRecordReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ AUDIT_LOGGER.debug(
+ "Session {} insertRecord, device {}, time {}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getTimestamp());
+
+ InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
try {
@@ -1685,6 +1874,38 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ public TSStatus insertTabletsV2(TSInsertTabletsReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletsReq to Statement
+ InsertMultiTabletsStatement statement =
+ (InsertMultiTabletsStatement) StatementGenerator.createStatement(req);
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i)
throws IllegalPathException {
InsertTabletPlan insertTabletPlan =
@@ -1706,22 +1927,22 @@ public class TSServiceImpl implements TSIService.Iface {
return insertTabletPlan;
}
- /** construct one InsertMultiTabletPlan and process it */
+ /** construct one InsertMultiTabletsPlan and process it */
public TSStatus insertTabletsInternally(TSInsertTabletsReq req) throws IllegalPathException {
List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
- InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan();
+ InsertMultiTabletsPlan insertMultiTabletsPlan = new InsertMultiTabletsPlan();
for (int i = 0; i < req.prefixPaths.size(); i++) {
InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i);
TSStatus status = SESSION_MANAGER.checkAuthority(insertTabletPlan, req.getSessionId());
if (status != null) {
// not authorized
- insertMultiTabletPlan.getResults().put(i, status);
+ insertMultiTabletsPlan.getResults().put(i, status);
}
insertTabletPlanList.add(insertTabletPlan);
}
- insertMultiTabletPlan.setInsertTabletPlanList(insertTabletPlanList);
- return executeNonQueryPlan(insertMultiTabletPlan);
+ insertMultiTabletsPlan.setInsertTabletPlanList(insertTabletPlanList);
+ return executeNonQueryPlan(insertMultiTabletsPlan);
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
index a5a3d84077..032ef810b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/InsertWithIDTableTest.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -743,9 +743,10 @@ public class InsertWithIDTableTest {
}
PlanExecutor executor = new PlanExecutor();
- InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+ InsertMultiTabletsPlan insertMultiTabletsPlan =
+ new InsertMultiTabletsPlan(insertTabletPlanList);
- executor.insertTablet(insertMultiTabletPlan);
+ executor.insertTablet(insertMultiTabletsPlan);
QueryPlan queryPlan =
(QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.multi.**");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index 429fb02e12..d7801c9bb8 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -89,9 +89,10 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
}
PlanExecutor executor = new PlanExecutor();
- InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+ InsertMultiTabletsPlan insertMultiTabletsPlan =
+ new InsertMultiTabletsPlan(insertTabletPlanList);
- executor.insertTablet(insertMultiTabletPlan);
+ executor.insertTablet(insertMultiTabletsPlan);
QueryPlan queryPlan =
(QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.multi.**");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
@@ -183,9 +184,10 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
}
PlanExecutor executor = new PlanExecutor();
- InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
- Assert.assertTrue(insertMultiTabletPlan.isEnableMultiThreading());
- executor.insertTablet(insertMultiTabletPlan);
+ InsertMultiTabletsPlan insertMultiTabletsPlan =
+ new InsertMultiTabletsPlan(insertTabletPlanList);
+ Assert.assertTrue(insertMultiTabletsPlan.isEnableMultiThreading());
+ executor.insertTablet(insertMultiTabletsPlan);
QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.**");
QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
@@ -278,9 +280,10 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
}
PlanExecutor executor = new PlanExecutor();
- InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+ InsertMultiTabletsPlan insertMultiTabletsPlan =
+ new InsertMultiTabletsPlan(insertTabletPlanList);
- executor.insertTablet(insertMultiTabletPlan);
+ executor.insertTablet(insertMultiTabletsPlan);
for (int i = 0; i < 1000; i++) {
QueryPlan queryPlan =