You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/24 05:48:12 UTC
[iotdb] 01/01: add insertOneDeviceRecords API in java session
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch new_insert_api
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 906dc397286e643a3eae229efe6ac7c8786f5ac7
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Dec 24 13:47:31 2020 +0800
add insertOneDeviceRecords API in java session
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 18 +++
.../engine/storagegroup/StorageGroupProcessor.java | 44 ++++++
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 8 ++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 59 +++++++++
.../org/apache/iotdb/db/qp/logical/Operator.java | 3 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 3 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 31 +++++
.../physical/crud/InsertRowsOfOneDevicePlan.java | 147 +++++++++++++++++++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 69 +++++++---
.../crud/InsertRowsOfOneDevicePlanTest.java | 25 ++++
.../java/org/apache/iotdb/session/Session.java | 130 +++++++++++++++++-
.../org/apache/iotdb/session/pool/SessionPool.java | 57 ++++++++
.../iotdb/session/IoTDBSessionComplexIT.java | 1 +
.../apache/iotdb/session/IoTDBSessionSimpleIT.java | 62 +++++++++
thrift/src/main/thrift/rpc.thrift | 12 ++
15 files changed, 644 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6f796b0..8bd7155 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -120,6 +121,8 @@ public class StorageEngine implements IService {
private ExecutorService recoverAllSgThreadPool;
+
+
static class InstanceHolder {
private InstanceHolder() {
@@ -381,6 +384,21 @@ public class StorageEngine implements IService {
}
}
+ public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+ throws StorageEngineException {
+ StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+
+ // TODO monitor: update statistics
+ try {
+ storageGroupProcessor.insert(insertRowsOfOneDevicePlan);
+ } catch (WriteProcessException e) {
+ throw new StorageEngineException(e);
+ }
+
+
+ }
+
+
/**
* insert a InsertTabletPlan to a storage group
*
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..6f854ee 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -2492,6 +2493,49 @@ public class StorageGroupProcessor {
return tsFileManagement;
}
+ public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+ throws WriteProcessException {
+
+ if (enableMemControl) {
+ StorageEngine.blockInsertionIfReject();
+ }
+ writeLock();
+ try {
+ boolean isSequence = false;
+ for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ if (!isAlive(plan.getTime())) {
+ //we do not need to write these part of data, as they can not be queried
+ continue;
+ }
+ // init map
+ long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
+
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ //as the plans have been ordered, and we have get the write lock,
+ //So, if a plan is sequenced, then all the rest plans are sequenced.
+ //
+ if (!isSequence) {
+ isSequence =
+ plan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
+ .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+ }
+ //is unsequence and user set config to discard out of order data
+ if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
+ .isEnableDiscardOutOfOrderData()) {
+ return;
+ }
+
+ latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+ // insert to sequence or unSequence file
+ insertToTsFileProcessor(plan, isSequence);
+ }
+ } finally {
+ writeUnlock();
+ }
+
+ }
+
private enum LoadTsFileType {
LOAD_SEQUENCE, LOAD_UNSEQUENCE
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 744e57b..7a893af 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -92,6 +93,13 @@ public interface IPlanExecutor {
void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
/**
+ * execute insert command and return whether the operator is successful.
+ *
+ * @param insertRowsOfOneDevicePlan physical insert plan
+ */
+ void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+ /**
* execute batch insert plan
*
* @return result of each row
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 66a6dcf..cfc251c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -195,6 +196,9 @@ public class PlanExecutor implements IPlanExecutor {
case INSERT:
insert((InsertRowPlan) plan);
return true;
+ case BATCH_INSERT_ONE_DEVICE:
+ insert((InsertRowsOfOneDevicePlan) plan);
+ return true;
case BATCHINSERT:
insertTablet((InsertTabletPlan) plan);
return true;
@@ -876,6 +880,7 @@ public class PlanExecutor implements IPlanExecutor {
insertRowPlan
.setMeasurementMNodes(new MeasurementMNode[insertRowPlan.getMeasurements().length]);
+ // check whether types are match
getSeriesSchemas(insertRowPlan);
insertRowPlan.transferType();
StorageEngine.getInstance().insert(insertRowPlan);
@@ -907,6 +912,60 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
+ public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+ throws QueryProcessException {
+ try {
+ for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ plan.setMeasurementMNodes(new MeasurementMNode[plan.getMeasurements().length]);
+ // check whether types are match
+ getSeriesSchemas(plan);
+ //we do not need to infer data type for insertRowsOfOneDevicePlan
+ }
+ //ok, we can begin to write data into the engine..
+ StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+ List<String> notExistedPaths = null;
+ List<String> failedMeasurements = null;
+ for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ if (plan.getFailedMeasurements() != null) {
+ if (notExistedPaths == null) {
+ notExistedPaths = new ArrayList<>();
+ failedMeasurements = new ArrayList<>();
+ }
+ // check if all path not exist exceptions
+ List<String> failedPaths = plan.getFailedMeasurements();
+ List<Exception> exceptions = plan.getFailedExceptions();
+ boolean isPathNotExistException = true;
+ for (Exception e : exceptions) {
+ Throwable curException = e;
+ while (curException.getCause() != null) {
+ curException = curException.getCause();
+ }
+ if (!(curException instanceof PathNotExistException)) {
+ isPathNotExistException = false;
+ break;
+ }
+ }
+ if (isPathNotExistException) {
+ notExistedPaths.addAll(failedPaths);
+ } else {
+ failedMeasurements.addAll(plan.getFailedMeasurements());
+ }
+ }
+ }
+ if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+ throw new PathNotExistException(notExistedPaths);
+ } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+ throw new StorageEngineException(
+ "failed to insert points " + failedMeasurements);
+ }
+
+ } catch (StorageEngineException | MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+
+ @Override
public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
try {
insertTabletPlan
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index b268365..9439913 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -77,6 +77,7 @@ public abstract class Operator {
TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
SHOW_MERGE_STATUS, CREATE_SCHEMA_SNAPSHOT, TRACING, DELETE_PARTITION,
- CREATE_MULTI_TIMESERIES
+ CREATE_MULTI_TIMESERIES,
+ BATCH_INSERT_ONE_DEVICE,
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 649bbac..c2842f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -298,7 +298,8 @@ public abstract class PhysicalPlan {
REVOKE_WATERMARK_EMBEDDING, CREATE_ROLE, DELETE_ROLE, CREATE_USER, REVOKE_USER_ROLE, REVOKE_ROLE_PRIVILEGE,
REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, MULTI_CREATE_TIMESERIES,
- ALTER_TIMESERIES, FLUSH
+ ALTER_TIMESERIES, FLUSH,
+ BATCH_INSERT_ONE_DEVICE,
}
public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index b41311e..3f96479 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -90,6 +90,20 @@ public class InsertRowPlan extends InsertPlan {
isNeedInferType = true;
}
+
+ public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurementList,
+ ByteBuffer values) throws QueryProcessException {
+ super(Operator.OperatorType.INSERT);
+ this.time = insertTime;
+ this.deviceId = deviceId;
+ this.measurements = measurementList;
+ this.dataTypes = new TSDataType[measurementList.length];
+ this.values = new Object[measurementList.length];
+ this.fillValues(values);
+ isNeedInferType = false;
+ }
+
+
@TestOnly
public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurements,
TSDataType[] dataTypes, String[] insertValues) {
@@ -257,6 +271,10 @@ public class InsertRowPlan extends InsertPlan {
putString(stream, deviceId.getFullPath());
+ serializeMeasurementsAndValues(stream);
+ }
+
+ void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
stream.writeInt(
measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
@@ -278,6 +296,7 @@ public class InsertRowPlan extends InsertPlan {
stream.writeLong(index);
}
+
private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
for (int i = 0; i < values.length; i++) {
// types are not determined, the situation mainly occurs when the plan uses string values
@@ -397,6 +416,10 @@ public class InsertRowPlan extends InsertPlan {
putString(buffer, deviceId.getFullPath());
+ serializeMeasurementsAndValues(buffer);
+ }
+
+ void serializeMeasurementsAndValues(ByteBuffer buffer) {
buffer
.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
@@ -422,6 +445,10 @@ public class InsertRowPlan extends InsertPlan {
this.time = buffer.getLong();
this.deviceId = new PartialPath(readString(buffer));
+ deserializeMeasurementsAndValues(buffer);
+ }
+
+ void deserializeMeasurementsAndValues(ByteBuffer buffer) {
int measurementSize = buffer.getInt();
this.measurements = new String[measurementSize];
@@ -464,4 +491,8 @@ public class InsertRowPlan extends InsertPlan {
failedValues = null;
return this;
}
+
+ boolean hasFailedValues() {
+ return failedValues != null && !failedValues.isEmpty();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
new file mode 100644
index 0000000..0bc6592
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+
+public class InsertRowsOfOneDevicePlan extends InsertPlan {
+
+ private InsertRowPlan[] rowPlans;
+
+ public InsertRowsOfOneDevicePlan(PartialPath deviceId, Long[] insertTimes,
+ List<List<String>> measurements,
+ ByteBuffer[] insertValues) throws QueryProcessException {
+ super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+ this.deviceId = deviceId;
+ rowPlans = new InsertRowPlan[insertTimes.length];
+ for (int i = 0; i < insertTimes.length; i++) {
+ rowPlans[i] = new InsertRowPlan(deviceId, insertTimes[i], measurements.get(i).toArray(new String[0]), insertValues[i]);
+ if (rowPlans[i].getMeasurements().length == 0) {
+ throw new QueryProcessException(
+ "The measurements are null, deviceId:" + deviceId
+ + ", time:" + insertTimes[i]);
+ }
+ if (rowPlans[i].getValues().length == 0) {
+ throw new QueryProcessException(
+ "The size of values in InsertRowsOfOneDevicePlan is 0, deviceId:" + deviceId
+ + ", time:" + insertTimes[i]);
+ }
+ }
+ }
+
+ //TODO do we need to rewrite hashCode?
+
+ @Override
+ public List<PartialPath> getPaths() {
+ Set<PartialPath> paths = new HashSet<>();
+ for (InsertRowPlan plan : rowPlans) {
+ paths.addAll(plan.getPaths());
+ }
+ return new ArrayList<>(paths);
+ }
+
+ @Override
+ public long getMinTime() {
+ long minTime = Long.MAX_VALUE;
+ for (InsertRowPlan plan : rowPlans) {
+ if (minTime > plan.getTime()) {
+ minTime = plan.getTime();
+ }
+ }
+ return minTime;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
+ stream.writeByte((byte) type);
+ putString(stream, deviceId.getFullPath());
+
+ stream.writeInt(rowPlans.length);
+ for (InsertRowPlan plan : rowPlans) {
+ stream.writeLong(plan.getTime());
+ plan.serializeMeasurementsAndValues(stream);
+ }
+ }
+
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ int type = PhysicalPlanType.INSERT.ordinal();
+ buffer.put((byte) type);
+
+ putString(buffer, deviceId.getFullPath());
+
+ buffer.putInt(rowPlans.length);
+ for (InsertRowPlan plan : rowPlans) {
+ buffer.putLong(plan.getTime());
+ plan.serializeMeasurementsAndValues(buffer);
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+
+ this.deviceId = new PartialPath(readString(buffer));
+ this.rowPlans = new InsertRowPlan[buffer.getInt()];
+ for (int i = 0; i < rowPlans.length; i ++) {
+ rowPlans[i] = new InsertRowPlan();
+ rowPlans[i].setDeviceId(deviceId);
+ rowPlans[i].setTime(buffer.getLong());
+ rowPlans[i].deserializeMeasurementsAndValues(buffer);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "deviceId: " + deviceId + ", times: " + rowPlans.length ;
+ }
+
+
+ @Override
+ public InsertPlan getPlanFromFailed() {
+ if (super.getPlanFromFailed() == null) {
+ return null;
+ }
+ List<InsertRowPlan> plans = new ArrayList<>();
+ for (InsertRowPlan plan : rowPlans) {
+ if (plan.hasFailedValues()) {
+ plans.add((InsertRowPlan) plan.getPlanFromFailed());
+ }
+ }
+ this.rowPlans = plans.toArray(new InsertRowPlan[0]);
+ return this;
+ }
+
+
+ public InsertRowPlan[] getRowPlans() {
+ return rowPlans;
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 76f4f59..7f2bf8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -114,6 +115,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertOneDeviceRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
@@ -1256,14 +1258,47 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (int i = 0; i < req.deviceIds.size(); i++) {
try {
- InsertRowPlan plan = new InsertRowPlan();
- plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
- plan.setTime(req.getTimestamps().get(i));
- plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
- plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
- plan.setValues(new Object[plan.getMeasurements().length]);
- plan.fillValues(req.valuesList.get(i));
- plan.setNeedInferType(false);
+ InsertRowPlan plan = new InsertRowPlan(
+ new PartialPath(req.getDeviceIds().get(i)), req.getTimestamps().get(i),
+ req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i)
+ );
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ statusList.add(status);
+ } else {
+ statusList.add(executeNonQueryPlan(plan));
+ }
+ } catch (Exception e) {
+ logger.error("meet error when insert in batch", e);
+ statusList.add(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ return RpcUtils.getStatus(statusList);
+ }
+
+ @Override
+ public TSStatus insertOneDeviceRecords(TSInsertOneDeviceRecordsReq req) throws TException {
+ if (auditLogger.isDebugEnabled()) {
+ auditLogger
+ .debug("Session {} insertRecords, device {}, first time {}", currSessionId.get(),
+ req.deviceId, req.getTimestamps().get(0));
+ }
+ if (!checkLogin(req.getSessionId())) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ List<TSStatus> statusList = new ArrayList<>();
+
+ for (int i = 0; i < req.measurementsList.size(); i++) {
+ try {
+ InsertRowsOfOneDevicePlan plan = new InsertRowsOfOneDevicePlan(
+ new PartialPath(req.getDeviceId()),
+ req.getTimestamps().toArray(new Long[0]),
+ req.getMeasurementsList(),
+ req.getValuesList().toArray(new ByteBuffer[0])
+ );
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
statusList.add(status);
@@ -1348,6 +1383,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
+ public TSStatus testInsertOneDeviceRecords(TSInsertOneDeviceRecordsReq req) throws TException {
+ logger.debug("Test insert rows in batch request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) throws TException {
logger.debug("Test insert string records request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1364,14 +1405,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
- InsertRowPlan plan = new InsertRowPlan();
- plan.setDeviceId(new PartialPath(req.getDeviceId()));
- plan.setTime(req.getTimestamp());
- plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
- plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
- plan.setValues(new Object[plan.getMeasurements().length]);
- plan.fillValues(req.values);
- plan.setNeedInferType(false);
+ InsertRowPlan plan = new InsertRowPlan(
+ new PartialPath(req.getDeviceId()), req.getTimestamp(),
+ req.getMeasurements().toArray(new String[0]), req.values
+ );
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..f7aed72
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d4abb13..8e59b6f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertOneDeviceRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
@@ -379,6 +381,117 @@ public class Session {
}
}
+ /**
+ * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+ * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+ * your performance, please see insertTablet method
+ * <p>
+ * Each row is independent, which could have different deviceId, time, number of measurements
+ *
+ * @see Session#insertTablet(Tablet)
+ */
+ public void insertOneDeviceRecords(String deviceId, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, false);
+ }
+ /**
+ * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+ * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+ * your performance, please see insertTablet method
+ * <p>
+ * Each row is independent, which could have different deviceId, time, number of measurements
+ *
+ * @param haveSorted whether the times have been sorted
+ * @see Session#insertTablet(Tablet)
+ */
+ public void insertOneDeviceRecords(String deviceId, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList, boolean haveSorted)
+ throws IoTDBConnectionException, StatementExecutionException {
+ TSInsertOneDeviceRecordsReq request = genTSInsertOneDeviceRecordsReq(deviceId, times, measurementsList,
+ typesList, valuesList, haveSorted);
+ try {
+ RpcUtils.verifySuccess(client.insertOneDeviceRecords(request));
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertOneDeviceRecords(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(
+ "Fail to reconnect to server. Please check server status");
+ }
+ }
+ }
+
+ private TSInsertOneDeviceRecordsReq genTSInsertOneDeviceRecordsReq(String deviceId, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+ // check params size
+ int len = times.size();
+ if (len != measurementsList.size() || len != valuesList.size()) {
+ throw new IllegalArgumentException(
+ "times, measurementsList and valuesList's size should be equal");
+ }
+
+ if (haveSorted) {
+ if (!checkSorted(times)) {
+ throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+ }
+ } else {
+ sortRecordsInOneDevice(times, measurementsList, typesList, valuesList);
+ }
+
+ TSInsertOneDeviceRecordsReq request = new TSInsertOneDeviceRecordsReq();
+ request.setSessionId(sessionId);
+ request.setDeviceId(deviceId);
+ request.setTimestamps(times);
+ request.setMeasurementsList(measurementsList);
+ List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
+ request.setValuesList(buffersList);
+ return request;
+ }
+
+ private void sortRecordsInOneDevice(List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
+ Integer[] index = new Integer[times.size()];
+ for (int i = 0; i < times.size(); i++) {
+ index[i] = i;
+ }
+ Arrays.sort(index, Comparator.comparingLong(times::get));
+ times.sort(Long::compareTo);
+ //sort measurementList
+ sortList(measurementsList, index);
+ //sort typesList
+ sortList(typesList, index);
+ //sort values
+ sortList(valuesList, index);
+ }
+
+ private void sortList(List source, Integer[] index) {
+ List tmpLists = new ArrayList<>(source);
+
+ for (int i = 0; i < index.length; i++) {
+ source.set(i, tmpLists.get(index[i]));
+ }
+ }
+
+ private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
+ throws IoTDBConnectionException {
+ List<ByteBuffer> buffersList = new ArrayList<>();
+ for (int i = 0; i < valuesList.size(); i++) {
+ ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+ putValues(typesList.get(i), valuesList.get(i), buffer);
+ buffer.flip();
+ buffersList.add(buffer);
+ }
+ return buffersList;
+ }
+
+
private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList) throws IoTDBConnectionException {
@@ -394,13 +507,7 @@ public class Session {
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
- List<ByteBuffer> buffersList = new ArrayList<>();
- for (int i = 0; i < measurementsList.size(); i++) {
- ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
- putValues(typesList.get(i), valuesList.get(i), buffer);
- buffer.flip();
- buffersList.add(buffer);
- }
+ List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
request.setValuesList(buffersList);
return request;
}
@@ -1064,6 +1171,15 @@ public class Session {
return true;
}
+ private boolean checkSorted(List<Long> times) {
+ for (int i = 1; i < times.size(); i++) {
+ if (times.get(i) < times.get(i - 1)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public void sortTablet(Tablet tablet) {
/*
* following part of code sort the batch data by time,
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0f8bb56..72ac8d1 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -415,6 +415,63 @@ public class SessionPool {
/**
+ * Insert data that belong to the same device in batch format, which can reduce the overhead of
+ * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+ * send them to server If you want
+ * improve your performance, please see insertTablet method
+ *
+ * @see Session#insertTablet(Tablet)
+ */
+ public void insertOneDeviceRecords(String deviceId, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, false);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertOneDeviceRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Insert data that belong to the same device in batch format, which can reduce the overhead of
+ * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+ * send them to server If you want
+ * improve your performance, please see insertTablet method
+ * @param haveSorted whether the times list has been ordered.
+ * @see Session#insertTablet(Tablet)
+ */
+ public void insertOneDeviceRecords(String deviceId, List<Long> times,
+ List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, haveSorted);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertOneDeviceRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+
+
+ /**
* Insert data in batch format, which can reduce the overhead of network. This method is just like
* jdbc batch insert, we pack some insert request in batch and send them to server If you want
* improve your performance, please see insertTablet method
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index ad59aea..a0b6123 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -779,4 +779,5 @@ public class IoTDBSessionComplexIT {
Assert.assertEquals(700, count);
}
}
+
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index 09849c8..f220234 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -388,4 +388,66 @@ public class IoTDBSessionSimpleIT {
session.deleteStorageGroup(storageGroup);
session.close();
}
+
+ @Test
+ public void testInsertOneDeviceRecords()
+ throws IoTDBConnectionException, StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+ List<Long> times = new ArrayList<>();
+ List<List<String>> measurements = new ArrayList<>();
+ List<List<TSDataType>> datatypes = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+
+ List<String> tmpMeasurements = null;
+ List<TSDataType> tmpDataTypes = null;
+ List<Object> tmpValues = null;
+
+ times.add(1L);
+ tmpMeasurements = new ArrayList<>();
+ tmpDataTypes = new ArrayList<>();
+ tmpValues = new ArrayList<>();
+ tmpMeasurements.add("s1");
+ tmpMeasurements.add("s2");
+ tmpDataTypes.add(TSDataType.INT32);
+ tmpDataTypes.add(TSDataType.INT32);
+ tmpValues.add(1);
+ tmpValues.add(2);
+ measurements.add(tmpMeasurements);
+ datatypes.add(tmpDataTypes);
+ values.add(tmpValues);
+
+
+ times.add(2L);
+ tmpMeasurements = new ArrayList<>();
+ tmpDataTypes = new ArrayList<>();
+ tmpValues = new ArrayList<>();
+ tmpMeasurements.add("s2");
+ tmpMeasurements.add("s3");
+ tmpDataTypes.add(TSDataType.INT32);
+ tmpDataTypes.add(TSDataType.INT64);
+ tmpValues.add(3);
+ tmpValues.add(4L);
+ measurements.add(tmpMeasurements);
+ datatypes.add(tmpDataTypes);
+ values.add(tmpValues);
+
+ times.add(3L);
+ tmpMeasurements = new ArrayList<>();
+ tmpDataTypes = new ArrayList<>();
+ tmpValues = new ArrayList<>();
+ tmpMeasurements.add("s4");
+ tmpMeasurements.add("s5");
+ tmpDataTypes.add(TSDataType.FLOAT);
+ tmpDataTypes.add(TSDataType.BOOLEAN);
+ tmpValues.add(5.0f);
+ tmpValues.add(Boolean.TRUE);
+ measurements.add(tmpMeasurements);
+ datatypes.add(tmpDataTypes);
+ values.add(tmpValues);
+
+ session.insertOneDeviceRecords("root.sg.d1", times, measurements, datatypes, values);
+ session.close();
+
+ }
}
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 4f412bf..45eda3d 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -225,6 +225,14 @@ struct TSInsertRecordsReq {
5: required list<i64> timestamps
}
+struct TSInsertOneDeviceRecordsReq {
+ 1: required i64 sessionId
+ 2: required string deviceId
+ 3: required list<list<string>> measurementsList
+ 4: required list<binary> valuesList
+ 5: required list<i64> timestamps
+}
+
struct TSInsertStringRecordsReq {
1: required i64 sessionId
2: required list<string> deviceIds
@@ -326,6 +334,8 @@ service TSIService {
TSStatus insertRecords(1:TSInsertRecordsReq req);
+ TSStatus insertOneDeviceRecords(1:TSInsertOneDeviceRecordsReq req);
+
TSStatus insertStringRecords(1:TSInsertStringRecordsReq req);
TSStatus testInsertTablet(1:TSInsertTabletReq req);
@@ -338,6 +348,8 @@ service TSIService {
TSStatus testInsertRecords(1:TSInsertRecordsReq req);
+ TSStatus testInsertOneDeviceRecords(1:TSInsertOneDeviceRecordsReq req);
+
TSStatus testInsertStringRecords(1:TSInsertStringRecordsReq req);
TSStatus deleteData(1:TSDeleteDataReq req);