You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/02/23 11:44:30 UTC
[iotdb] branch master updated: [IOTDB-1163]optimize the
insertRecords session interface for cluster version (#2698)
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b4544d8 [IOTDB-1163]optimize the insertRecords session interface for cluster version (#2698)
b4544d8 is described below
commit b4544d8fc55cd8fa2005e14b5cfe797ba376d294
Author: HouliangQi <ne...@163.com>
AuthorDate: Tue Feb 23 19:44:07 2021 +0800
[IOTDB-1163]optimize the insertRecords session interface for cluster version (#2698)
---
.../iotdb/cluster/coordinator/Coordinator.java | 33 +++-
.../cluster/log/applier/AsyncDataLogApplier.java | 8 +
.../iotdb/cluster/log/applier/DataLogApplier.java | 11 ++
.../apache/iotdb/cluster/metadata/CMManager.java | 34 +++-
.../iotdb/cluster/query/ClusterPlanRouter.java | 35 +++-
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 8 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 27 +++
.../org/apache/iotdb/db/qp/logical/Operator.java | 3 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 8 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 10 +-
.../physical/crud/InsertRowsOfOneDevicePlan.java | 4 +-
.../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 219 +++++++++++++++++++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 87 +++++---
13 files changed, 441 insertions(+), 46 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 0ad14f4..f319dc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -306,8 +307,10 @@ public class Coordinator {
} else {
if (plan instanceof InsertTabletPlan
|| plan instanceof InsertMultiTabletPlan
- || plan instanceof CreateMultiTimeSeriesPlan) {
- // InsertTabletPlan, InsertMultiTabletPlan and CreateMultiTimeSeriesPlan contains many rows,
+ || plan instanceof CreateMultiTimeSeriesPlan
+ || plan instanceof InsertRowsPlan) {
+ // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan
+ // contains many rows,
// each will correspond to a TSStatus as its execution result,
// as the plan is split and the sub-plans may have interleaving ranges,
// we must assure that each TSStatus is placed to the right position
@@ -455,8 +458,11 @@ public class Coordinator {
// and the second dimension is the number of rows per InsertTabletPlan
totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize();
} else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
- totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();
+ totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getPaths().size();
+ } else if (parentPlan instanceof InsertRowsPlan) {
+ totalRowNum = ((InsertRowsPlan) parentPlan).getRowCount();
}
+
if (subStatus == null) {
subStatus = new TSStatus[totalRowNum];
Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
@@ -492,8 +498,14 @@ public class Coordinator {
for (int i = 0; i < subPlan.getIndexes().size(); i++) {
subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i);
}
+ } else if (parentPlan instanceof InsertRowsPlan) {
+ InsertRowsPlan subPlan = (InsertRowsPlan) entry.getKey();
+ for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
+ subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i);
+ }
}
}
+
if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// execution failed, record the error message
errorCodePartitionGroups.add(
@@ -549,6 +561,21 @@ public class Coordinator {
subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
}
}
+
+ if (parentPlan instanceof InsertRowsPlan
+ && !((InsertRowsPlan) parentPlan).getResults().isEmpty()) {
+ if (subStatus == null) {
+ subStatus = new TSStatus[totalRowNum];
+ Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+ }
+ noFailure = false;
+ isBatchFailure = true;
+ for (Map.Entry<Integer, TSStatus> integerTSStatusEntry :
+ ((InsertRowsPlan) parentPlan).getResults().entrySet()) {
+ subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
+ }
+ }
+
return concludeFinalStatus(
noFailure, endPoint, isBatchFailure, subStatus, errorCodePartitionGroups);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 8c4c2eb..65a5b2d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -142,6 +143,10 @@ public class AsyncDataLogApplier implements LogApplier {
* same. this is done in {@link
* org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertMultiTabletPlan)}
*
+ * <p>We can also sure that the storage group of all InsertRowPlans in InsertRowsPlan are the
+ * same. this is done in {@link
+ * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertRowsPlan)}
+ *
* @return the sg that the plan belongs to
* @throws StorageGroupNotSetException if no sg found
*/
@@ -150,6 +155,9 @@ public class AsyncDataLogApplier implements LogApplier {
if (plan instanceof InsertMultiTabletPlan) {
PartialPath deviceId = ((InsertMultiTabletPlan) plan).getFirstDeviceId();
sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId);
+ } else if (plan instanceof InsertRowsPlan) {
+ PartialPath path = ((InsertRowsPlan) plan).getFirstDeviceId();
+ sgPath = IoTDB.metaManager.getStorageGroupPath(path);
} else if (plan instanceof InsertPlan) {
PartialPath deviceId = ((InsertPlan) plan).getDeviceId();
sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 5eecc7d..7d11d3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -37,6 +37,8 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -67,6 +69,8 @@ public class DataLogApplier extends BaseApplier {
PhysicalPlan plan = physicalPlanLog.getPlan();
if (plan instanceof InsertMultiTabletPlan) {
applyInsert((InsertMultiTabletPlan) plan);
+ } else if (plan instanceof InsertRowsPlan) {
+ applyInsert((InsertRowsPlan) plan);
} else if (plan instanceof InsertPlan) {
applyInsert((InsertPlan) plan);
} else {
@@ -101,6 +105,13 @@ public class DataLogApplier extends BaseApplier {
}
}
+ private void applyInsert(InsertRowsPlan plan)
+ throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
+ for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
+ applyInsert(insertRowPlan);
+ }
+ }
+
private void applyInsert(InsertPlan plan)
throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
// check if the corresponding slot is being pulled
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 50c3732..e3a58f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -431,8 +432,12 @@ public class CMManager extends MManager {
public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException {
// try to set storage group
List<PartialPath> deviceIds;
- // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently
- if (plan instanceof InsertPlan && !(plan instanceof InsertMultiTabletPlan)) {
+ // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently,
+ if (plan instanceof InsertPlan
+ && !(plan instanceof InsertMultiTabletPlan)
+ && !(plan instanceof InsertRowsPlan)) {
+ // InsertMultiTabletPlan and InsertRowsPlan have multiple devices, and other types of
+ // InsertPlan have only one device.
deviceIds = Collections.singletonList(((InsertPlan) plan).getDeviceId());
} else if (plan instanceof CreateTimeSeriesPlan) {
deviceIds = Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath());
@@ -528,7 +533,26 @@ public class CMManager extends MManager {
boolean success = createTimeseries(insertTabletPlan);
allSuccess = allSuccess && success;
if (!success) {
- logger.error("create timeseries for device={} failed", insertTabletPlan.getDeviceId());
+ logger.error(
+ "create timeseries for device={} failed, plan={}",
+ insertTabletPlan.getDeviceId(),
+ insertTabletPlan);
+ }
+ }
+ return allSuccess;
+ }
+
+ public boolean createTimeseries(InsertRowsPlan insertRowsPlan)
+ throws CheckConsistencyException, IllegalPathException {
+ boolean allSuccess = true;
+ for (InsertRowPlan insertRowPlan : insertRowsPlan.getInsertRowPlanList()) {
+ boolean success = createTimeseries(insertRowPlan);
+ allSuccess = allSuccess && success;
+ if (!success) {
+ logger.error(
+ "create timeseries for device={} failed, plan={}",
+ insertRowPlan.getDeviceId(),
+ insertRowPlan);
}
}
return allSuccess;
@@ -546,6 +570,10 @@ public class CMManager extends MManager {
return createTimeseries((InsertMultiTabletPlan) insertPlan);
}
+ if (insertPlan instanceof InsertRowsPlan) {
+ return createTimeseries((InsertRowsPlan) insertPlan);
+ }
+
List<String> seriesList = new ArrayList<>();
PartialPath deviceId = insertPlan.getDeviceId();
PartialPath storageGroupName;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index c573449..acb0b77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CountPlan;
@@ -52,6 +53,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
public class ClusterPlanRouter {
@@ -110,7 +112,9 @@ public class ClusterPlanRouter {
public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
throws UnsupportedPlanException, MetadataException {
- if (plan instanceof InsertTabletPlan) {
+ if (plan instanceof InsertRowsPlan) {
+ return splitAndRoutePlan((InsertRowsPlan) plan);
+ } else if (plan instanceof InsertTabletPlan) {
return splitAndRoutePlan((InsertTabletPlan) plan);
} else if (plan instanceof InsertMultiTabletPlan) {
return splitAndRoutePlan((InsertMultiTabletPlan) plan);
@@ -227,6 +231,35 @@ public class ClusterPlanRouter {
return result;
}
+ /**
+ * @param insertRowsPlan InsertRowsPlan
+ * @return key is InsertRowsPlan, value is the partition group the plan belongs to, all
+ * InsertRowPlans in InsertRowsPlan belongs to one same storage group.
+ */
+ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsPlan insertRowsPlan)
+ throws MetadataException {
+ Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+ Map<PartitionGroup, InsertRowsPlan> groupPlanMap = new HashMap<>();
+ for (int i = 0; i < insertRowsPlan.getInsertRowPlanList().size(); i++) {
+ InsertRowPlan rowPlan = insertRowsPlan.getInsertRowPlanList().get(i);
+ PartialPath storageGroup = getMManager().getStorageGroupPath(rowPlan.getDeviceId());
+ PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), rowPlan.getTime());
+ if (groupPlanMap.containsKey(group)) {
+ InsertRowsPlan tmpPlan = groupPlanMap.get(group);
+ tmpPlan.addOneInsertRowPlan(rowPlan, i);
+ } else {
+ InsertRowsPlan tmpPlan = new InsertRowsPlan();
+ tmpPlan.addOneInsertRowPlan(rowPlan, i);
+ groupPlanMap.put(group, tmpPlan);
+ }
+ }
+
+ for (Entry<PartitionGroup, InsertRowsPlan> entry : groupPlanMap.entrySet()) {
+ result.put(entry.getValue(), entry.getKey());
+ }
+ return result;
+ }
+
@SuppressWarnings("SuspiciousSystemArraycopy")
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertTabletPlan plan)
throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 76defce..af24983 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -99,6 +100,13 @@ public interface IPlanExecutor {
/**
* execute insert command and return whether the operator is successful.
*
+ * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans
+ */
+ void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException;
+
+ /**
+ * execute insert command and return whether the operator is successful.
+ *
* @param insertRowsOfOneDevicePlan physical insert plan
*/
void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 299bcbb..0eb5e7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
@@ -115,6 +116,8 @@ import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
@@ -221,6 +224,9 @@ public class PlanExecutor implements IPlanExecutor {
case BATCH_INSERT_ONE_DEVICE:
insert((InsertRowsOfOneDevicePlan) plan);
return true;
+ case BATCH_INSERT_ROWS:
+ insert((InsertRowsPlan) plan);
+ return true;
case BATCHINSERT:
insertTablet((InsertTabletPlan) plan);
return true;
@@ -1095,6 +1101,27 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
+ public void insert(InsertRowsPlan plan) throws QueryProcessException {
+ boolean allSuccess = true;
+ for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
+ if (plan.getResults().containsKey(i)) {
+ allSuccess = false;
+ continue;
+ }
+ try {
+ insert(plan.getInsertRowPlanList().get(i));
+ plan.getResults().put(i, RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ } catch (QueryProcessException e) {
+ plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ allSuccess = false;
+ }
+ if (!allSuccess) {
+ throw new BatchProcessException(plan.getResults().values().toArray(new TSStatus[0]));
+ }
+ }
+ }
+
+ @Override
public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
try {
insertRowPlan.setMeasurementMNodes(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 3be1099..d1e9971 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -145,6 +145,7 @@ public abstract class Operator {
MEASUREMENT_MNODE,
STORAGE_GROUP_MNODE,
BATCH_INSERT_ONE_DEVICE,
- MULTI_BATCH_INSERT;
+ MULTI_BATCH_INSERT,
+ BATCH_INSERT_ROWS
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 29888ea..b329812 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
@@ -348,6 +349,10 @@ public abstract class PhysicalPlan {
plan = new StorageGroupMNodePlan();
plan.deserialize(buffer);
break;
+ case BATCH_INSERT_ROWS:
+ plan = new InsertRowsPlan();
+ plan.deserialize(buffer);
+ break;
default:
throw new IOException("unrecognized log type " + type);
}
@@ -390,7 +395,8 @@ public abstract class PhysicalPlan {
MEASUREMENT_MNODE,
STORAGE_GROUP_MNODE,
BATCH_INSERT_ONE_DEVICE,
- MULTI_BATCH_INSERT
+ MULTI_BATCH_INSERT,
+ BATCH_INSERT_ROWS
}
public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 58b80c8..8399527 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -290,8 +290,11 @@ public class InsertRowPlan extends InsertPlan {
public void serialize(DataOutputStream stream) throws IOException {
int type = PhysicalPlanType.INSERT.ordinal();
stream.writeByte((byte) type);
- stream.writeLong(time);
+ subSerialize(stream);
+ }
+ public void subSerialize(DataOutputStream stream) throws IOException {
+ stream.writeLong(time);
putString(stream, deviceId.getFullPath());
serializeMeasurementsAndValues(stream);
}
@@ -435,8 +438,11 @@ public class InsertRowPlan extends InsertPlan {
public void serialize(ByteBuffer buffer) {
int type = PhysicalPlanType.INSERT.ordinal();
buffer.put((byte) type);
- buffer.putLong(time);
+ subSerialize(buffer);
+ }
+ public void subSerialize(ByteBuffer buffer) {
+ buffer.putLong(time);
putString(buffer, deviceId.getFullPath());
serializeMeasurementsAndValues(buffer);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index e3211f4..e13dc83 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -108,11 +108,10 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
@Override
public void serialize(ByteBuffer buffer) {
- int type = PhysicalPlanType.INSERT.ordinal();
+ int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
buffer.put((byte) type);
putString(buffer, deviceId.getFullPath());
-
buffer.putInt(rowPlans.length);
for (InsertRowPlan plan : rowPlans) {
buffer.putLong(plan.getTime());
@@ -122,7 +121,6 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
-
this.deviceId = new PartialPath(readString(buffer));
this.rowPlans = new InsertRowPlan[buffer.getInt()];
for (int i = 0; i < rowPlans.length; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
new file mode 100644
index 0000000..5e2f484
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class InsertRowsPlan extends InsertPlan {
+ /**
+ * Suppose there is an InsertRowsPlan, which contains 5 InsertRowPlans,
+ * insertRowPlanList={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3,
+ * InsertRowPlan_4}, then the insertRowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the
+ * InsertRowsPlan is split into two InsertRowsPlans according to different storage group in
+ * cluster version, suppose that the InsertRowsPlan_1's insertRowPlanList = {InsertRowPlan_0,
+ * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsPlan_1's insertRowPlanIndexList = {0, 3, 4};
+ * InsertRowsPlan_2's insertRowPlanList = {InsertRowPlan_1, * InsertRowPlan_2} then
+ * InsertRowsPlan_2's insertRowPlanIndexList= {1, 2} respectively;
+ */
+ private List<Integer> insertRowPlanIndexList;
+
+ /** the InsertRowsPlan list */
+ private List<InsertRowPlan> insertRowPlanList;
+
+ /** record the result of insert rows */
+ private Map<Integer, TSStatus> results = new HashMap<>();
+
+ public InsertRowsPlan() {
+ super(OperatorType.BATCH_INSERT_ROWS);
+ insertRowPlanList = new ArrayList<>();
+ insertRowPlanIndexList = new ArrayList<>();
+ }
+
+ @Override
+ public long getMinTime() {
+ long minTime = Long.MAX_VALUE;
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ if (insertRowPlan.getMinTime() < minTime) {
+ minTime = insertRowPlan.getMinTime();
+ }
+ }
+ return minTime;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ List<PartialPath> result = new ArrayList<>();
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ result.addAll(insertRowPlan.getPaths());
+ }
+ return result;
+ }
+
+ @Override
+ public void checkIntegrity() throws QueryProcessException {
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ insertRowPlan.checkIntegrity();
+ }
+ }
+
+ @Override
+ public void recoverFromFailure() {
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ insertRowPlan.recoverFromFailure();
+ }
+ }
+
+ @Override
+ public InsertPlan getPlanFromFailed() {
+ if (super.getPlanFromFailed() == null) {
+ return null;
+ }
+ List<InsertRowPlan> plans = new ArrayList<>();
+ List<Integer> indexes = new ArrayList<>();
+ for (int i = 0; i < insertRowPlanList.size(); i++) {
+ if (insertRowPlanList.get(i).hasFailedValues()) {
+ plans.add((InsertRowPlan) insertRowPlanList.get(i).getPlanFromFailed());
+ indexes.add(i);
+ }
+ }
+ this.insertRowPlanList = plans;
+ this.insertRowPlanIndexList = indexes;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InsertRowsPlan that = (InsertRowsPlan) o;
+
+ if (!Objects.equals(insertRowPlanIndexList, that.insertRowPlanIndexList)) {
+ return false;
+ }
+ if (!Objects.equals(insertRowPlanList, that.insertRowPlanList)) {
+ return false;
+ }
+ return Objects.equals(results, that.results);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = insertRowPlanIndexList != null ? insertRowPlanIndexList.hashCode() : 0;
+ result = 31 * result + (insertRowPlanList != null ? insertRowPlanList.hashCode() : 0);
+ result = 31 * result + (results != null ? results.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal();
+ buffer.put((byte) type);
+ buffer.putInt(insertRowPlanList.size());
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ insertRowPlan.subSerialize(buffer);
+ }
+ for (Integer index : insertRowPlanIndexList) {
+ buffer.putInt(index);
+ }
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal();
+ stream.writeByte((byte) type);
+ stream.writeInt(insertRowPlanList.size());
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ insertRowPlan.subSerialize(stream);
+ }
+ for (Integer index : insertRowPlanIndexList) {
+ stream.writeInt(index);
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+ int size = buffer.getInt();
+ this.insertRowPlanList = new ArrayList<>(size);
+ this.insertRowPlanIndexList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ InsertRowPlan insertRowPlan = new InsertRowPlan();
+ insertRowPlan.deserialize(buffer);
+ insertRowPlanList.add(insertRowPlan);
+ }
+
+ for (int i = 0; i < size; i++) {
+ insertRowPlanIndexList.add(buffer.getInt());
+ }
+ }
+
+ public Map<Integer, TSStatus> getResults() {
+ return results;
+ }
+
+ public void addOneInsertRowPlan(InsertRowPlan plan, int index) {
+ insertRowPlanList.add(plan);
+ insertRowPlanIndexList.add(index);
+ }
+
+ public List<Integer> getInsertRowPlanIndexList() {
+ return insertRowPlanIndexList;
+ }
+
+ public List<InsertRowPlan> getInsertRowPlanList() {
+ return insertRowPlanList;
+ }
+
+ public int getRowCount() {
+ return insertRowPlanList.size();
+ }
+
+ public PartialPath getFirstDeviceId() {
+ return insertRowPlanList.get(0).getDeviceId();
+ }
+
+ @Override
+ public String toString() {
+ return "InsertRowsPlan{"
+ + " insertRowPlanIndexList="
+ + insertRowPlanIndexList
+ + ", insertRowPlanList="
+ + insertRowPlanList
+ + ", results="
+ + results
+ + "}";
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 55b5a1f..60feccf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -141,6 +142,7 @@ import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -1141,10 +1143,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
req.deviceIds.get(0),
req.getTimestamps().get(0));
}
-
- List<TSStatus> statusList = new ArrayList<>();
-
- boolean isAllSuccessful = true;
+ boolean allSuccess = true;
+ InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
for (int i = 0; i < req.deviceIds.size(); i++) {
try {
InsertRowPlan plan =
@@ -1154,23 +1154,46 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
req.getMeasurementsList().get(i).toArray(new String[0]),
req.valuesList.get(i));
TSStatus status = checkAuthority(plan, req.getSessionId());
- if (status == null) {
- status = executeNonQueryPlan(plan);
- isAllSuccessful =
- ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
- && isAllSuccessful);
+ if (status != null) {
+ insertRowsPlan.getResults().put(i, status);
+ allSuccess = false;
}
- statusList.add(status);
+ insertRowsPlan.addOneInsertRowPlan(plan, i);
} catch (Exception e) {
- isAllSuccessful = false;
- statusList.add(
- onNPEOrUnexpectedException(e, "inserting records", TSStatusCode.INTERNAL_SERVER_ERROR));
+ allSuccess = false;
+ insertRowsPlan
+ .getResults()
+ .put(
+ i,
+ onNPEOrUnexpectedException(
+ e, "inserting records", TSStatusCode.INTERNAL_SERVER_ERROR));
}
}
+ TSStatus tsStatus = executeNonQueryPlan(insertRowsPlan);
+
+ return judgeFinalTsStatus(
+ allSuccess, tsStatus, insertRowsPlan.getResults(), req.deviceIds.size());
+ }
+
+ private TSStatus judgeFinalTsStatus(
+ boolean allSuccess,
+ TSStatus executeTsStatus,
+ Map<Integer, TSStatus> checkTsStatus,
+ int totalRowCount) {
+
+ if (allSuccess) {
+ return executeTsStatus;
+ }
- return isAllSuccessful
- ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
- : RpcUtils.getStatus(statusList);
+ if (executeTsStatus.subStatus == null) {
+ TSStatus[] tmpSubTsStatus = new TSStatus[totalRowCount];
+ Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
+ executeTsStatus.subStatus = Arrays.asList(tmpSubTsStatus);
+ }
+ for (Entry<Integer, TSStatus> entry : checkTsStatus.entrySet()) {
+ executeTsStatus.subStatus.set(entry.getKey(), entry.getValue());
+ }
+ return RpcUtils.getStatus(executeTsStatus.subStatus);
}
@Override
@@ -1229,10 +1252,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
req.getTimestamps().get(0));
}
- List<TSStatus> statusList = new ArrayList<>();
- InsertRowPlan plan = new InsertRowPlan();
- boolean isAllSuccessful = true;
+ boolean allSuccess = true;
+ InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
for (int i = 0; i < req.deviceIds.size(); i++) {
+ InsertRowPlan plan = new InsertRowPlan();
try {
plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
plan.setTime(req.getTimestamps().get(i));
@@ -1241,24 +1264,24 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
plan.setValues(req.getValuesList().get(i).toArray(new Object[0]));
plan.setNeedInferType(true);
TSStatus status = checkAuthority(plan, req.getSessionId());
- if (status == null) {
- status = executeNonQueryPlan(plan);
- isAllSuccessful =
- ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
- && isAllSuccessful);
+ if (status != null) {
+ insertRowsPlan.getResults().put(i, status);
}
- statusList.add(status);
+ insertRowsPlan.addOneInsertRowPlan(plan, i);
} catch (Exception e) {
- isAllSuccessful = false;
- statusList.add(
- onNPEOrUnexpectedException(
- e, "inserting string records", TSStatusCode.INTERNAL_SERVER_ERROR));
+ insertRowsPlan
+ .getResults()
+ .put(
+ i,
+ onNPEOrUnexpectedException(
+ e, "inserting string records", TSStatusCode.INTERNAL_SERVER_ERROR));
+ allSuccess = false;
}
}
+ TSStatus tsStatus = executeNonQueryPlan(insertRowsPlan);
- return isAllSuccessful
- ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
- : RpcUtils.getStatus(statusList);
+ return judgeFinalTsStatus(
+ allSuccess, tsStatus, insertRowsPlan.getResults(), req.deviceIds.size());
}
@Override