You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/02 06:42:06 UTC
[incubator-iotdb] 01/01: add insertStringRecord rpc method
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_insert_row_plan_type
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 635204afb6fed922e26a4af1ab6c42ac95ca503f
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jul 2 14:41:49 2020 +0800
add insertStringRecord rpc method
---
.../org/apache/iotdb/db/metadata/MManager.java | 25 ++----
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 6 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 15 ++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 88 +++++++++++++++++++++-
service-rpc/rpc-changelist.md | 3 +
service-rpc/src/main/thrift/rpc.thrift | 26 ++++++-
.../java/org/apache/iotdb/session/Session.java | 63 ++++++----------
7 files changed, 150 insertions(+), 76 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 57eb238..a431c07 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -1835,13 +1834,10 @@ public class MManager {
/**
* get schema for device.
* Attention!!! Only support insertPlan
- * @param deviceId
- * @param measurementList
- * @param plan
- * @return
* @throws MetadataException
*/
- public MeasurementSchema[] getSeriesSchemasAndLock(String deviceId, String[] measurementList, PhysicalPlan plan) throws MetadataException {
+ public MeasurementSchema[] getSeriesSchemasAndReadLockDevice(String deviceId,
+ String[] measurementList, InsertPlan plan) throws MetadataException {
MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
MNode deviceNode;
@@ -1895,9 +1891,7 @@ public class MManager {
measurementList[i], insertDataType, measurementNode.getSchema().getType()));
} else {
// mark failed measurement
- if( plan instanceof InsertPlan){
- ((InsertPlan) plan).markFailedMeasurementInsertion(i);
- }
+ plan.markFailedMeasurementInsertion(i);
continue;
}
}
@@ -1911,9 +1905,7 @@ public class MManager {
e.getMessage());
if (config.isEnablePartialInsert()) {
// mark failed measurement
- if (plan instanceof InsertPlan) {
- ((InsertPlan) plan).markFailedMeasurementInsertion(i);
- }
+ plan.markFailedMeasurementInsertion(i);
} else {
throw e;
}
@@ -1949,18 +1941,15 @@ public class MManager {
/**
* get dataType of plan, in loc measurements
* only support InsertRowPlan and InsertTabletPlan
- * @param plan
- * @param loc
- * @return
* @throws MetadataException
*/
- private TSDataType getTypeInLoc(PhysicalPlan plan, int loc) throws MetadataException {
+ private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
TSDataType dataType;
if (plan instanceof InsertRowPlan) {
InsertRowPlan tPlan = (InsertRowPlan) plan;
dataType = TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
} else if (plan instanceof InsertTabletPlan) {
- dataType = ((InsertTabletPlan) plan).getDataTypes()[loc];
+ dataType = (plan).getDataTypes()[loc];
} else {
throw new MetadataException(String.format(
"Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
@@ -1973,7 +1962,7 @@ public class MManager {
* after insert, we should call this function to unlock the device node
* @param deviceId
*/
- public void unlockInsert(String deviceId) {
+ public void unlockDeviceReadLock(String deviceId) {
try {
MNode mNode = getDeviceNode(deviceId);
mNode.readUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a283744..40593f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -867,7 +867,7 @@ public class PlanExecutor implements IPlanExecutor {
protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
throws MetadataException {
- return mManager.getSeriesSchemasAndLock(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
+ return mManager.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
}
@Override
@@ -883,7 +883,7 @@ public class PlanExecutor implements IPlanExecutor {
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
- mManager.unlockInsert(insertRowPlan.getDeviceId());
+ mManager.unlockDeviceReadLock(insertRowPlan.getDeviceId());
}
}
@@ -900,7 +900,7 @@ public class PlanExecutor implements IPlanExecutor {
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
- mManager.unlockInsert(insertTabletPlan.getDeviceId());
+ mManager.unlockDeviceReadLock(insertTabletPlan.getDeviceId());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index fc26300..0bef4f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -62,8 +62,7 @@ public class InsertRowPlan extends InsertPlan {
@TestOnly
public InsertRowPlan(String deviceId, long insertTime, String[] measurements,
- TSDataType[] dataTypes,
- String[] insertValues) {
+ TSDataType[] dataTypes, String[] insertValues) {
super(OperatorType.INSERT);
this.time = insertTime;
this.deviceId = deviceId;
@@ -113,8 +112,7 @@ public class InsertRowPlan extends InsertPlan {
}
public InsertRowPlan(String deviceId, long insertTime, String[] measurementList,
- TSDataType[] dataTypes,
- Object[] insertValues) {
+ TSDataType[] dataTypes, Object[] insertValues) {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
this.deviceId = deviceId;
@@ -129,8 +127,8 @@ public class InsertRowPlan extends InsertPlan {
this.time = insertTime;
this.deviceId = deviceId;
this.measurements = measurementList;
- // build types and values
this.dataTypes = new TSDataType[measurements.length];
+ // We need to create an Object[] for the data type casting, because we can not set Float, Long to String[i]
this.values = new Object[measurements.length];
System.arraycopy(insertValues, 0, values, 0, measurements.length);
isNeedInferType = true;
@@ -322,7 +320,10 @@ public class InsertRowPlan extends InsertPlan {
}
}
- public void setValues(ByteBuffer buffer) throws QueryProcessException {
+ /**
+ * Make sure the values is already inited before calling this
+ */
+ public void fillValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < measurements.length; i++) {
dataTypes[i] = ReadWriteIOUtils.readDataType(buffer);
switch (dataTypes[i]) {
@@ -389,7 +390,7 @@ public class InsertRowPlan extends InsertPlan {
this.dataTypes = new TSDataType[measurementSize];
this.values = new Object[measurementSize];
try {
- setValues(buffer);
+ fillValues(buffer);
} catch (QueryProcessException e) {
e.printStackTrace();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 74edd18..3f95840 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1087,8 +1087,46 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setValues(new Object[plan.getMeasurements().length]);
- plan.setValues(req.valuesList.get(i));
- plan.setNeedInferType(req.isInferType());
+ plan.fillValues(req.valuesList.get(i));
+ plan.setNeedInferType(false);
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ statusList.add(status);
+ } else {
+ statusList.add(executePlan(plan));
+ }
+ } catch (Exception e) {
+ logger.error("meet error when insert in batch", e);
+ statusList.add(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ return RpcUtils.getStatus(statusList);
+ }
+
+ @Override
+ public TSStatus insertStringRecords(TSInsertStringRecordsReq req) throws TException {
+ if (auditLogger.isDebugEnabled()) {
+ auditLogger
+ .debug("Session {} insertRecords, first device {}, first time {}", currSessionId.get(),
+ req.deviceIds.get(0), req.getTimestamps().get(0));
+ }
+ if (!checkLogin(req.getSessionId())) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ List<TSStatus> statusList = new ArrayList<>();
+ InsertRowPlan plan = new InsertRowPlan();
+ for (int i = 0; i < req.deviceIds.size(); i++) {
+ try {
+ plan.setDeviceId(req.getDeviceIds().get(i));
+ plan.setTime(req.getTimestamps().get(i));
+ plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
+ plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
+ plan.setValues(
+ req.getValuesList().get(i).toArray(new Object[req.getValuesList().get(i).size()]));
+ plan.setNeedInferType(true);
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
statusList.add(status);
@@ -1123,12 +1161,24 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
+ public TSStatus testInsertStringRecord(TSInsertStringRecordReq req) throws TException {
+ logger.debug("Test insert string record request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
public TSStatus testInsertRecords(TSInsertRecordsReq req) {
logger.debug("Test insert row in batch request receive.");
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
+ public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) throws TException {
+ logger.debug("Test insert string records request receive.");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ @Override
public TSStatus insertRecord(TSInsertRecordReq req) {
try {
auditLogger
@@ -1145,8 +1195,38 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
plan.setValues(new Object[plan.getMeasurements().length]);
- plan.setValues(req.values);
- plan.setNeedInferType(req.isInferType());
+ plan.fillValues(req.values);
+ plan.setNeedInferType(false);
+
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ return status;
+ }
+ return executePlan(plan);
+ } catch (Exception e) {
+ logger.error("meet error when insert", e);
+ }
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+
+ @Override
+ public TSStatus insertStringRecord(TSInsertStringRecordReq req) throws TException {
+ try {
+ auditLogger
+ .debug("Session {} insertRecord, device {}, time {}", currSessionId.get(),
+ req.getDeviceId(), req.getTimestamp());
+ if (!checkLogin(req.getSessionId())) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ InsertRowPlan plan = new InsertRowPlan();
+ plan.setDeviceId(req.getDeviceId());
+ plan.setTime(req.getTimestamp());
+ plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
+ plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
+ plan.setValues(req.getValues().toArray(new Object[req.getValues().size()]));
+ plan.setNeedInferType(true);
TSStatus status = checkAuthority(plan, req.getSessionId());
if (status != null) {
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index aaf8ebe..5fc8cbf 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -36,6 +36,8 @@ Last Updated on 2020-6-29 by Xiangdong Huang.
| Latest Changes | Related Committers |
| ------------------------------------------------------------ | ---------------------- |
| set the input/output as TFramedTransport | Tian Jiang |
+| add TSInsertStringRecordReq and TSInsertStringRecordsReq struct | Jianlin Qiao |
+| add insertStringRecord, insertStringRecords and related test methods | Jianlin Qiao |
## 3. Update
@@ -44,6 +46,7 @@ Last Updated on 2020-6-29 by Xiangdong Huang.
| ------------------------------------------------------------ | ---------------------- |
| Add sub-status in TSStatus | Tian Jiang |
| Change the result of executeBatchStatement as TSStatus | Tian Jiang |
+| Remove inferType from TSInsertRecordReq | Jialin Qiao |
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 64cb9ee..d7ab4bc 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -169,7 +169,14 @@ struct TSInsertRecordReq {
3: required list<string> measurements
4: required binary values
5: required i64 timestamp
- 6: optional bool inferType
+}
+
+struct TSInsertStringRecordReq {
+ 1: required i64 sessionId
+ 2: required string deviceId
+ 3: required list<string> measurements
+ 4: required list<string> values
+ 5: required i64 timestamp
}
struct TSInsertTabletReq {
@@ -198,7 +205,14 @@ struct TSInsertRecordsReq {
3: required list<list<string>> measurementsList
4: required list<binary> valuesList
5: required list<i64> timestamps
- 6: optional bool inferType
+}
+
+struct TSInsertStringRecordsReq {
+ 1: required i64 sessionId
+ 2: required list<string> deviceIds
+ 3: required list<list<string>> measurementsList
+ 4: required list<list<string>> valuesList
+ 5: required list<i64> timestamps
}
struct TSDeleteDataReq {
@@ -292,20 +306,28 @@ service TSIService {
TSStatus insertRecord(1:TSInsertRecordReq req);
+ TSStatus insertStringRecord(1:TSInsertStringRecordReq req);
+
TSStatus insertTablet(1:TSInsertTabletReq req);
TSStatus insertTablets(1:TSInsertTabletsReq req);
TSStatus insertRecords(1:TSInsertRecordsReq req);
+ TSStatus insertStringRecords(1:TSInsertStringRecordsReq req);
+
TSStatus testInsertTablet(1:TSInsertTabletReq req);
TSStatus testInsertTablets(1:TSInsertTabletsReq req);
TSStatus testInsertRecord(1:TSInsertRecordReq req);
+ TSStatus testInsertStringRecord(1:TSInsertStringRecordReq req);
+
TSStatus testInsertRecords(1:TSInsertRecordsReq req);
+ TSStatus testInsertStringRecords(1:TSInsertStringRecordsReq req);
+
TSStatus deleteData(1:TSDeleteDataReq req);
i64 requestStatementId(1:i64 sessionId);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5bb573e..2f5119d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -385,17 +387,17 @@ public class Session {
List<List<String>> measurementsList, List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordsReq request =genTSInsertRecordsReq(deviceIds, times, measurementsList, valuesList);
+ TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times,
+ measurementsList, valuesList);
try {
- RpcUtils.verifySuccess(client.insertRecords(request));
+ RpcUtils.verifySuccess(client.insertStringRecords(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
}
- private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
- List<List<String>> measurementsList, List<List<String>> valuesList)
- throws IoTDBConnectionException {
+ private TSInsertStringRecordsReq genTSInsertStringRecordsReq(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList) {
// check params size
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
@@ -403,21 +405,13 @@ public class Session {
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
- TSInsertRecordsReq request = new TSInsertRecordsReq();
+ TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
request.setSessionId(sessionId);
request.setDeviceIds(deviceIds);
request.setTimestamps(times);
request.setMeasurementsList(measurementsList);
- request.setInferType(true);
- List<ByteBuffer> buffersList = new ArrayList<>();
- for (int i = 0; i < measurementsList.size(); i++) {
- ByteBuffer buffer = ByteBuffer.allocate(calculateStrLength(valuesList.get(i)));
- putStrValues(valuesList.get(i), buffer);
- buffer.flip();
- buffersList.add(buffer);
- }
- request.setValuesList(buffersList);
- return request;
+ request.setValuesList(valuesList);
+ return request;
}
/**
@@ -463,41 +457,25 @@ public class Session {
public void insertRecord(String deviceId, long time, List<String> measurements,
List<String> values) throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, values);
+ TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, time, measurements, values);
try {
- RpcUtils.verifySuccess(client.insertRecord(request));
+ RpcUtils.verifySuccess(client.insertStringRecord(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
}
- private TSInsertRecordReq genTSInsertRecordReq(String deviceId, long time, List<String> measurements,
- List<String> values) throws IoTDBConnectionException {
- TSInsertRecordReq request = new TSInsertRecordReq();
+ private TSInsertStringRecordReq genTSInsertStringRecordReq(String deviceId, long time,
+ List<String> measurements, List<String> values) {
+ TSInsertStringRecordReq request = new TSInsertStringRecordReq();
request.setSessionId(sessionId);
request.setDeviceId(deviceId);
request.setTimestamp(time);
request.setMeasurements(measurements);
- request.setInferType(true);
- ByteBuffer buffer = ByteBuffer.allocate(calculateStrLength(values));
- putStrValues(values, buffer);
- buffer.flip();
- request.setValues(buffer);
+ request.setValues(values);
return request;
}
-
- private void putStrValues(List<String> values, ByteBuffer buffer)
- throws IoTDBConnectionException {
- for (int i = 0; i < values.size(); i++) {
- ReadWriteIOUtils.write(TSDataType.TEXT, buffer);
- byte[] bytes = ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
- ReadWriteIOUtils.write(bytes.length, buffer);
- buffer.put(bytes);
- }
- }
-
-
/**
* put value in buffer
*
@@ -651,10 +629,11 @@ public class Session {
public void testInsertRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList, valuesList);
+ TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times,
+ measurementsList, valuesList);
try {
- RpcUtils.verifySuccess(client.testInsertRecords(request));
+ RpcUtils.verifySuccess(client.testInsertStringRecords(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}
@@ -680,10 +659,10 @@ public class Session {
*/
public void testInsertRecord(String deviceId, long time, List<String> measurements,
List<String> values) throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, values);
+ TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, time, measurements, values);
try {
- RpcUtils.verifySuccess(client.testInsertRecord(request));
+ RpcUtils.verifySuccess(client.testInsertStringRecord(request));
} catch (TException e) {
throw new IoTDBConnectionException(e);
}