You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/09/21 11:51:56 UTC
[incubator-iotdb] 02/02: add delete timeseries in session
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch delete_timeseries_in_session
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3a281cfbd97721d42b17fcfe823165c1e6f0d32b
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Sep 21 19:51:38 2019 +0800
add delete timeseries in session
---
.../UserGuide/7-Session API/1-Session API.md | 30 +++++++----
.../UserGuide/7-Session API/1-Session API.md | 30 +++++++----
.../main/java/org/apache/iotdb/SessionExample.java | 28 +++++++----
.../iotdb/db/qp/physical/crud/DeletePlan.java | 4 ++
.../iotdb/db/qp/physical/sys/MetadataPlan.java | 12 +++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 31 ++++++++++--
service-rpc/src/main/thrift/rpc.thrift | 8 +--
.../java/org/apache/iotdb/session/Session.java | 42 ++++++++++++++--
.../org/apache/iotdb/session/IoTDBSessionIT.java | 58 +++++++++++++++++-----
9 files changed, 188 insertions(+), 55 deletions(-)
diff --git a/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md b/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md
index 01bb6f5..dfc1efd 100644
--- a/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md
+++ b/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md
@@ -74,9 +74,9 @@ public class SessionExample {
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
insert();
-// insertRowBatch();
- delete();
-
+ insertRowBatch();
+ deleteData();
+ deleteTimeseries();
session.close();
}
@@ -86,7 +86,7 @@ public class SessionExample {
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
- for (long time = 0; time < 30000; time++) {
+ for (long time = 0; time < 100; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
@@ -95,12 +95,6 @@ public class SessionExample {
}
}
- private static void delete() throws IoTDBSessionException {
- String path = "root.sg1.d1.s1";
- long deleteTime = 29999;
- session.delete(path, deleteTime);
- }
-
private static void insertRowBatch() throws IoTDBSessionException {
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
@@ -112,7 +106,7 @@ public class SessionExample {
long[] timestamps = rowBatch.timestamps;
Object[] values = rowBatch.values;
- for (long time = 0; time < 30000; time++) {
+ for (long time = 0; time < 100; time++) {
int row = rowBatch.batchSize++;
timestamps[row] = time;
for (int i = 0; i < 3; i++) {
@@ -130,6 +124,20 @@ public class SessionExample {
rowBatch.reset();
}
}
+
+ private static void deleteData() throws IoTDBSessionException {
+ String path = "root.sg1.d1.s1";
+ long deleteTime = 99;
+ session.deleteData(path, deleteTime);
+ }
+
+ private static void deleteTimeseries() throws IoTDBSessionException {
+ List<String> paths = new ArrayList<>();
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d1.s2");
+ paths.add("root.sg1.d1.s3");
+ session.deleteTimeseries(paths);
+ }
}
```
diff --git a/docs/Documentation/UserGuide/7-Session API/1-Session API.md b/docs/Documentation/UserGuide/7-Session API/1-Session API.md
index 82e881c..0bf86ba 100644
--- a/docs/Documentation/UserGuide/7-Session API/1-Session API.md
+++ b/docs/Documentation/UserGuide/7-Session API/1-Session API.md
@@ -83,9 +83,9 @@ public class SessionExample {
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
insert();
-// insertRowBatch();
- delete();
-
+ insertRowBatch();
+ deleteData();
+ deleteTimeseries();
session.close();
}
@@ -95,7 +95,7 @@ public class SessionExample {
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
- for (long time = 0; time < 30000; time++) {
+ for (long time = 0; time < 100; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
@@ -104,12 +104,6 @@ public class SessionExample {
}
}
- private static void delete() throws IoTDBSessionException {
- String path = "root.sg1.d1.s1";
- long deleteTime = 29999;
- session.delete(path, deleteTime);
- }
-
private static void insertRowBatch() throws IoTDBSessionException {
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
@@ -121,7 +115,7 @@ public class SessionExample {
long[] timestamps = rowBatch.timestamps;
Object[] values = rowBatch.values;
- for (long time = 0; time < 30000; time++) {
+ for (long time = 0; time < 100; time++) {
int row = rowBatch.batchSize++;
timestamps[row] = time;
for (int i = 0; i < 3; i++) {
@@ -139,6 +133,20 @@ public class SessionExample {
rowBatch.reset();
}
}
+
+ private static void deleteData() throws IoTDBSessionException {
+ String path = "root.sg1.d1.s1";
+ long deleteTime = 99;
+ session.deleteData(path, deleteTime);
+ }
+
+ private static void deleteTimeseries() throws IoTDBSessionException {
+ List<String> paths = new ArrayList<>();
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d1.s2");
+ paths.add("root.sg1.d1.s3");
+ session.deleteTimeseries(paths);
+ }
}
```
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 3c43e1a..857e1b5 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -43,9 +43,9 @@ public class SessionExample {
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
insert();
-// insertRowBatch();
- delete();
-
+ insertRowBatch();
+ deleteData();
+ deleteTimeseries();
session.close();
}
@@ -57,19 +57,13 @@ public class SessionExample {
measurements.add("s3");
for (long time = 0; time < 100; time++) {
List<String> values = new ArrayList<>();
- values.add("1a");
+ values.add("1");
values.add("2");
values.add("3");
session.insert(deviceId, time, measurements, values);
}
}
- private static void delete() throws IoTDBSessionException {
- String path = "root.sg1.d1.s1";
- long deleteTime = 99;
- session.deleteData(path, deleteTime);
- }
-
private static void insertRowBatch() throws IoTDBSessionException {
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
@@ -99,4 +93,18 @@ public class SessionExample {
rowBatch.reset();
}
}
+
+ private static void deleteData() throws IoTDBSessionException {
+ String path = "root.sg1.d1.s1";
+ long deleteTime = 99;
+ session.deleteData(path, deleteTime);
+ }
+
+ private static void deleteTimeseries() throws IoTDBSessionException {
+ List<String> paths = new ArrayList<>();
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d1.s2");
+ paths.add("root.sg1.d1.s3");
+ session.deleteTimeseries(paths);
+ }
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index 2bd139b..c427802 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -71,6 +71,10 @@ public class DeletePlan extends PhysicalPlan {
this.paths.add(path);
}
+ public void addPaths(List<Path> paths) {
+ this.paths.addAll(paths);
+ }
+
@Override
public List<Path> getPaths() {
return paths;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index ae8f8b4..4b3866d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -76,6 +76,18 @@ public class MetadataPlan extends PhysicalPlan {
setOperatorType(namespaceType);
}
+ /**
+ * delete time series plan
+ * @param namespaceType
+ * @param path
+ */
+ public MetadataPlan(MetadataOperator.NamespaceType namespaceType, List<Path> path) {
+ super(false, Operator.OperatorType.METADATA);
+ this.namespaceType = namespaceType;
+ this.deletePathList = path;
+ setOperatorType(namespaceType);
+ }
+
public Path getPath() {
return path;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 8b5449b..59c3ce9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.MetadataOperator;
+import org.apache.iotdb.db.qp.logical.sys.MetadataOperator.NamespaceType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -81,7 +82,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSDeleteReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
@@ -1067,7 +1068,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
- public TSStatus deleteData(TSDeleteReq req) throws TException {
+ public TSStatus deleteData(TSDeleteDataReq req) {
if (!checkLogin()) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return new TSStatus(getStatus(TSStatusCode.NOT_LOGIN_ERROR));
@@ -1075,7 +1076,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
DeletePlan plan = new DeletePlan();
plan.setDeleteTime(req.getTimestamp());
- plan.addPath(new Path(req.getPath()));
+ List<Path> paths = new ArrayList<>();
+ for (String path: req.getPaths()) {
+ paths.add(new Path(path));
+ }
+ plan.addPaths(paths);
TSStatus status = checkAuthority(plan);
if (status != null) {
@@ -1147,7 +1152,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
- public TSStatus createTimeseries(TSCreateTimeseriesReq req) throws TException {
+ public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
if (!checkLogin()) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return new TSStatus(getStatus(TSStatusCode.NOT_LOGIN_ERROR));
@@ -1164,6 +1169,24 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
@Override
+ public TSStatus deleteTimeseries(List<String> paths) {
+ if (!checkLogin()) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ return new TSStatus(getStatus(TSStatusCode.NOT_LOGIN_ERROR));
+ }
+ List<Path> pathList = new ArrayList<>();
+ for (String path: paths) {
+ pathList.add(new Path(path));
+ }
+ MetadataPlan plan = new MetadataPlan(NamespaceType.DELETE_PATH, pathList);
+ TSStatus status = checkAuthority(plan);
+ if (status != null) {
+ return new TSStatus(status);
+ }
+ return new TSStatus(executePlan(plan));
+ }
+
+ @Override
public long requestStatementId() {
return globalStmtId.incrementAndGet();
}
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index c08f9aa..88fb51b 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -246,8 +246,8 @@ struct TSInsertReq {
4: required i64 timestamp
}
-struct TSDeleteReq {
- 1: required string path
+struct TSDeleteDataReq {
+ 1: required list<string> paths
2: required i64 timestamp
}
@@ -299,9 +299,11 @@ service TSIService {
TSStatus createTimeseries(1:TSCreateTimeseriesReq req);
+ TSStatus deleteTimeseries(1:list<string> path)
+
TSStatus insertRow(1:TSInsertReq req);
- TSStatus deleteData(1:TSDeleteReq req);
+ TSStatus deleteData(1:TSDeleteDataReq req);
i64 requestStatementId();
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 272ffea..03f130b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.session;
+import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
@@ -177,13 +178,46 @@ public class Session {
}
/**
- * @param path data of whose time series to delete
+ * delete a timeseries, including data and schema
+ * @param path timeseries to delete, should be a whole path
+ */
+ public synchronized TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
+ List<String> paths = new ArrayList<>();
+ paths.add(path);
+ return deleteTimeseries(paths);
+ }
+
+ /**
+ * delete a timeseries, including data and schema
+ * @param paths timeseries to delete, should be a whole path
+ */
+ public synchronized TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+ try {
+ return checkAndReturn(client.deleteTimeseries(paths));
+ } catch (TException e) {
+ throw new IoTDBSessionException(e);
+ }
+ }
+
+ /**
+ * delete data <= time in one timeseries
+ * @param path data in which time series to delete
* @param time data with time stamp less than or equal to time will be deleted
- * @return result statue
*/
public synchronized TSStatus deleteData(String path, long time) throws IoTDBSessionException {
- TSDeleteReq request = new TSDeleteReq();
- request.setPath(path);
+ List<String> paths = new ArrayList<>();
+ paths.add(path);
+ return deleteData(paths, time);
+ }
+
+ /**
+ * delete data <= time in multiple timeseries
+ * @param paths data in which time series to delete
+ * @param time data with time stamp less than or equal to time will be deleted
+ */
+ public synchronized TSStatus deleteData(List<String> paths, long time) throws IoTDBSessionException {
+ TSDeleteDataReq request = new TSDeleteDataReq();
+ request.setPaths(paths);
request.setTimestamp(time);
try {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 740d7e5..839f27f 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -61,23 +61,27 @@ public class IoTDBSessionIT {
session.setStorageGroup("root.sg1");
- createTimeseriesTest();
- insertTest();
+ createTimeseries();
+ insert();
// insertRowBatchTest();
- deleteTest();
+ deleteData();
- queryTest();
+ query();
+
+ deleteTimeseries();
+
+ query2();
session.close();
}
- public void createTimeseriesTest() throws IoTDBSessionException {
+ private void createTimeseries() throws IoTDBSessionException {
session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
}
- public void insertTest() throws IoTDBSessionException {
+ private void insert() throws IoTDBSessionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
@@ -92,7 +96,7 @@ public class IoTDBSessionIT {
}
}
- private void insertRowBatchTest() throws IoTDBSessionException {
+ private void insertRowBatch() throws IoTDBSessionException {
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
@@ -122,18 +126,24 @@ public class IoTDBSessionIT {
}
}
- public void deleteTest() throws IoTDBSessionException {
+ private void deleteData() throws IoTDBSessionException {
String path1 = "root.sg1.d1.s1";
String path2 = "root.sg1.d1.s2";
String path3 = "root.sg1.d1.s3";
long deleteTime = 99;
- session.deleteData(path1, deleteTime);
- session.deleteData(path2, deleteTime);
- session.deleteData(path3, deleteTime);
+ List<String> paths = new ArrayList<>();
+ paths.add(path1);
+ paths.add(path2);
+ paths.add(path3);
+ session.deleteData(paths, deleteTime);
+ }
+
+ private void deleteTimeseries() throws IoTDBSessionException {
+ session.deleteTimeseries("root.sg1.d1.s1");
}
- public void queryTest() throws ClassNotFoundException, SQLException {
+ private void query() throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
String standard =
"Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
@@ -156,4 +166,28 @@ public class IoTDBSessionIT {
Assert.assertEquals(resultStr.toString(), standard);
}
}
+
+ private void query2() throws ClassNotFoundException, SQLException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ String standard =
+ "Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery("select * from root");
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ final int colCount = metaData.getColumnCount();
+ StringBuilder resultStr = new StringBuilder();
+ for (int i = 0; i < colCount; i++) {
+ resultStr.append(metaData.getColumnLabel(i + 1) + "\n");
+ }
+ while (resultSet.next()) {
+ for (int i = 1; i <= colCount; i++) {
+ resultStr.append(resultSet.getString(i)).append(",");
+ }
+ resultStr.append("\n");
+ }
+ Assert.assertEquals(resultStr.toString(), standard);
+ }
+ }
}