You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/15 14:33:43 UTC
[incubator-iotdb] 01/01: add insertInBatch in Session
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch add_insertInBatch
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 66aaf6502312db71a974fac774f13b5cd897abe2
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Mar 15 22:33:23 2020 +0800
add insertInBatch in Session
---
.../main/java/org/apache/iotdb/SessionExample.java | 34 +++++++++++
grafana/pom.xml | 2 +-
jdbc/pom.xml | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 29 +++++++++
service-rpc/pom.xml | 2 +-
service-rpc/rpc-changelist.md | 2 +
service-rpc/src/main/thrift/rpc.thrift | 14 +++++
.../java/org/apache/iotdb/session/Session.java | 69 +++++++++++++++-------
.../org/apache/iotdb/session/IoTDBSessionIT.java | 64 +++++++++++++++++++-
9 files changed, 192 insertions(+), 26 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 8e41dbd..49c365e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -49,6 +49,7 @@ public class SessionExample {
insert();
insertRowBatch();
+ insertInBatch();
nonQuery();
query();
deleteData();
@@ -101,6 +102,39 @@ public class SessionExample {
}
}
+ private static void insertInBatch() throws IoTDBSessionException {
+ String deviceId = "root.sg1.d2";
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<String>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("1");
+ values.add("2");
+ values.add("3");
+
+ deviceIds.add(deviceId);
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ }
+
private static void deleteData() throws IoTDBSessionException {
String path = "root.sg1.d1.s1";
long deleteTime = 99;
diff --git a/grafana/pom.xml b/grafana/pom.xml
index 2a4f3e1..f50f0fd 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -165,7 +165,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 36f373c..5e0b55f 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -126,7 +126,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index da61a4f..23c4368 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
@@ -99,6 +100,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
@@ -513,6 +515,33 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
+ public TSExecuteInsertRowInBatchResp insertRowInBatch(TSInsertInBatchReq req) {
+ TSExecuteInsertRowInBatchResp resp = new TSExecuteInsertRowInBatchResp();
+ if (!checkLogin()) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ resp.addToStatusList(new TSStatus(getStatus(TSStatusCode.NOT_LOGIN_ERROR)));
+ return resp;
+ }
+
+ InsertPlan plan = new InsertPlan();
+ for (int i = 0; i < req.deviceIds.size(); i++) {
+ plan.setDeviceId(req.getDeviceIds().get(i));
+ plan.setTime(req.getTimestamps().get(i));
+ plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
+ plan.setValues(req.getValuesList().get(i).toArray(new String[0]));
+ TSStatus status = checkAuthority(plan);
+ if (status != null) {
+ resp.addToStatusList(new TSStatus(status));
+ }
+ else{
+ resp.addToStatusList(executePlan(plan));
+ }
+ }
+
+ return resp;
+ }
+
+ @Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
List<Integer> result = new ArrayList<>();
diff --git a/service-rpc/pom.xml b/service-rpc/pom.xml
index c3043e2..ce6abd4 100644
--- a/service-rpc/pom.xml
+++ b/service-rpc/pom.xml
@@ -148,7 +148,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
<pluginExecution>
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 5e08140..6425a98 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -51,6 +51,8 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Add method TSStatus deleteData(1:TSDeleteDataReq req) | Jack Tsai, Jialin Qiao, qiaojialin |
| Add method TSStatus deleteTimeseries(1:list\<string> path) | qiaojialin |
| Add method TSStatus deleteStorageGroups(1:list\<string> storageGroup) | Yi Tao |
+| Add Struct TSExecuteInsertRowInBatchResp | Kaifeng Xue |
+| Add method insertRowInBatch(1:TSInsertInBatchReq req); | Kaifeng Xue |
## 3. Update
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 5de9c06..533aa3d 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -233,6 +233,18 @@ struct TSBatchInsertionReq {
6: required i32 size
}
+struct TSExecuteInsertRowInBatchResp{
+ 1: required list<TSStatus> statusList
+}
+
+
+struct TSInsertInBatchReq {
+ 1: required list<string> deviceIds
+ 2: required list<list<string>> measurementsList
+ 3: required list<list<string>> valuesList
+ 4: required list<i64> timestamps
+}
+
struct TSInsertReq {
1: required string deviceId
2: required list<string> measurements
@@ -289,6 +301,8 @@ service TSIService {
TSExecuteBatchStatementResp insertBatch(1:TSBatchInsertionReq req);
+ TSExecuteInsertRowInBatchResp insertRowInBatch(1:TSInsertInBatchReq req);
+
TSStatus setStorageGroup(1:string storageGroup);
TSStatus createTimeseries(1:TSCreateTimeseriesReq req);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 78791b9..5f93e00 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -36,7 +36,6 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@@ -57,7 +56,6 @@ public class Session {
private TSocket transport;
private boolean isClosed = true;
private ZoneId zoneId;
- private TSOperationHandle operationHandle;
private long statementId;
private int fetchSize;
@@ -86,11 +84,11 @@ public class Session {
this.fetchSize = fetchSize;
}
- public synchronized void open() throws IoTDBSessionException {
+ public void open() throws IoTDBSessionException {
open(false, 0);
}
- private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+ private void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBSessionException {
if (!isClosed) {
return;
@@ -146,7 +144,7 @@ public class Session {
}
- public synchronized void close() throws IoTDBSessionException {
+ public void close() throws IoTDBSessionException {
if (isClosed) {
return;
}
@@ -164,7 +162,7 @@ public class Session {
}
}
- public synchronized TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
+ public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
throws IoTDBSessionException {
TSBatchInsertionReq request = new TSBatchInsertionReq();
request.deviceId = rowBatch.deviceId;
@@ -183,7 +181,38 @@ public class Session {
}
}
- public synchronized TSStatus insert(String deviceId, long time, List<String> measurements,
+ /**
+ * insert data in batch format, which can reduce the overhead of network
+ */
+ public List<TSStatus> insertInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<String>> valuesList)
+ throws IoTDBSessionException {
+ // check params size
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
+ throw new IllegalArgumentException(
+ "deviceIds, times, measurementsList and valuesList's size should be equal");
+ }
+
+ TSInsertInBatchReq request = new TSInsertInBatchReq();
+ request.setDeviceIds(deviceIds);
+ request.setTimestamps(times);
+ request.setMeasurementsList(measurementsList);
+ request.setValuesList(valuesList);
+
+ try {
+ List<TSStatus> result = new ArrayList<>();
+ for (TSStatus cur : client.insertRowInBatch(request).getStatusList()) {
+ result.add(checkAndReturn(cur));
+ }
+ return result;
+ } catch (TException e) {
+ throw new IoTDBSessionException(e);
+ }
+ }
+
+ public TSStatus insert(String deviceId, long time, List<String> measurements,
List<String> values)
throws IoTDBSessionException {
TSInsertReq request = new TSInsertReq();
@@ -204,7 +233,7 @@ public class Session {
*
* @param path timeseries to delete, should be a whole path
*/
- synchronized TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
+ public TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
List<String> paths = new ArrayList<>();
paths.add(path);
return deleteTimeseries(paths);
@@ -215,7 +244,7 @@ public class Session {
*
* @param paths timeseries to delete, should be a whole path
*/
- public synchronized TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+ public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
try {
return checkAndReturn(client.deleteTimeseries(paths));
} catch (TException e) {
@@ -229,7 +258,7 @@ public class Session {
* @param path data in which time series to delete
* @param time data with time stamp less than or equal to time will be deleted
*/
- public synchronized TSStatus deleteData(String path, long time) throws IoTDBSessionException {
+ public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
List<String> paths = new ArrayList<>();
paths.add(path);
return deleteData(paths, time);
@@ -241,7 +270,7 @@ public class Session {
* @param paths data in which time series to delete
* @param time data with time stamp less than or equal to time will be deleted
*/
- synchronized TSStatus deleteData(List<String> paths, long time)
+ public TSStatus deleteData(List<String> paths, long time)
throws IoTDBSessionException {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setPaths(paths);
@@ -254,7 +283,7 @@ public class Session {
}
}
- public synchronized TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
+ public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
checkPathValidity(storageGroupId);
try {
return checkAndReturn(client.setStorageGroup(storageGroupId));
@@ -264,14 +293,14 @@ public class Session {
}
- synchronized TSStatus deleteStorageGroup(String storageGroup)
+ public TSStatus deleteStorageGroup(String storageGroup)
throws IoTDBSessionException {
List<String> groups = new ArrayList<>();
groups.add(storageGroup);
return deleteStorageGroups(groups);
}
- synchronized TSStatus deleteStorageGroups(List<String> storageGroup)
+ public TSStatus deleteStorageGroups(List<String> storageGroup)
throws IoTDBSessionException {
try {
return checkAndReturn(client.deleteStorageGroups(storageGroup));
@@ -280,7 +309,7 @@ public class Session {
}
}
- public synchronized TSStatus createTimeseries(String path, TSDataType dataType,
+ public TSStatus createTimeseries(String path, TSDataType dataType,
TSEncoding encoding, CompressionType compressor) throws IoTDBSessionException {
checkPathValidity(path);
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
@@ -310,7 +339,7 @@ public class Session {
return resp;
}
- private synchronized String getTimeZone() throws TException, IoTDBRPCException {
+ private String getTimeZone() throws TException, IoTDBRPCException {
if (zoneId != null) {
return zoneId.toString();
}
@@ -320,7 +349,7 @@ public class Session {
return resp.getTimeZone();
}
- private synchronized void setTimeZone(String zoneId) throws TException, IoTDBRPCException {
+ private void setTimeZone(String zoneId) throws TException, IoTDBRPCException {
TSSetTimeZoneReq req = new TSSetTimeZoneReq(zoneId);
TSStatus resp = client.setTimeZone(req);
RpcUtils.verifySuccess(resp);
@@ -355,9 +384,9 @@ public class Session {
TSExecuteStatementResp execResp = client.executeStatement(execReq);
RpcUtils.verifySuccess(execResp.getStatus());
- operationHandle = execResp.getOperationHandle();
SessionDataSet dataSet = new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
- operationHandle.getOperationId().getQueryId(), client, operationHandle);
+ execResp.getOperationHandle().getOperationId().getQueryId(), client,
+ execResp.getOperationHandle());
dataSet.setBatchSize(fetchSize);
return dataSet;
}
@@ -375,8 +404,6 @@ public class Session {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
- operationHandle = execResp.getOperationHandle();
-
RpcUtils.verifySuccess(execResp.getStatus());
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 30c7afc..5a36510 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -93,6 +93,10 @@ public class IoTDBSessionIT {
query2();
+ insertInBatch();
+
+ query4();
+
// Add another storage group to test the deletion of storage group
session.setStorageGroup("root.sg2");
session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64, TSEncoding.RLE,
@@ -132,6 +136,45 @@ public class IoTDBSessionIT {
CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d2.s2", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d2.s3", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ }
+
+ private void insertInBatch() throws IoTDBSessionException {
+ String deviceId = "root.sg1.d2";
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<String>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("1");
+ values.add("2");
+ values.add("3");
+
+ deviceIds.add(deviceId);
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
}
private void insert() throws IoTDBSessionException {
@@ -199,7 +242,8 @@ public class IoTDBSessionIT {
private void query() throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
String standard =
- "Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
+ "Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n"
+ + "root.sg1.d2.s1\n" + "root.sg1.d2.s2\n" + "root.sg1.d2.s3\n";
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
@@ -223,7 +267,8 @@ public class IoTDBSessionIT {
private void query2() throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
String standard =
- "Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
+ "Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n"
+ + "root.sg1.d2.s1\n" + "root.sg1.d2.s2\n" + "root.sg1.d2.s3\n";
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
@@ -300,6 +345,21 @@ public class IoTDBSessionIT {
sessionDataSet.closeOperationHandle();
}
+ private void query4() throws TException, IoTDBRPCException, SQLException {
+ SessionDataSet sessionDataSet = session.executeQueryStatement("select * from root.sg1.d2");
+ sessionDataSet.setBatchSize(1024);
+ int count = 0;
+ while (sessionDataSet.hasNext()) {
+ long index = 1;
+ count++;
+ for (Field f : sessionDataSet.next().getFields()) {
+ Assert.assertEquals(f.getLongV(), index);
+ index++;
+ }
+ }
+ Assert.assertEquals(500, count);
+ sessionDataSet.closeOperationHandle();
+ }
private void insert_via_sql() throws TException, IoTDBRPCException {
session.executeNonQueryStatement(