You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/21 08:41:41 UTC
[incubator-iotdb] branch master updated: [IOTDB-607] add batch
create timeseries in rpc (#1075)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 39f8feb [IOTDB-607] add batch create timeseries in rpc (#1075)
39f8feb is described below
commit 39f8feb29f7fbe92ffd50971150dd1c1bf567c4f
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Tue Apr 21 16:41:34 2020 +0800
[IOTDB-607] add batch create timeseries in rpc (#1075)
* add batch create timeseries
* update rpc changelist
---
.../main/java/org/apache/iotdb/SessionExample.java | 60 +++++++++++--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 98 ++++++++++++++++++----
.../apache/iotdb/db/sql/CheckPathValidityTest.java | 49 +++++++++++
service-rpc/rpc-changelist.md | 2 +
service-rpc/src/main/thrift/rpc.thrift | 14 ++++
.../main/java/org/apache/iotdb/session/Config.java | 14 ----
.../java/org/apache/iotdb/session/Session.java | 54 +++++++++---
.../org/apache/iotdb/session/pool/SessionPool.java | 31 +++++--
.../iotdb/session/CheckPathValidityTest.java | 49 -----------
.../org/apache/iotdb/session/IoTDBSessionIT.java | 41 ++++++++-
11 files changed, 320 insertions(+), 105 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 2654db7..020188f 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -53,6 +53,21 @@ public class SessionExample {
// ignore duplicated set
}
+ createTimeseries();
+ createMultiTimeseries();
+ insert();
+ insertInBatch();
+ insertRowBatch();
+ nonQuery();
+ query();
+ deleteData();
+ deleteTimeseries();
+ session.close();
+ }
+
+ private static void createTimeseries()
+ throws IoTDBConnectionException, StatementExecutionException {
+
if (!session.checkTimeseriesExists("root.sg1.d1.s1")) {
session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
@@ -65,6 +80,7 @@ public class SessionExample {
session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
}
+
if (!session.checkTimeseriesExists("root.sg1.d1.s4")) {
Map<String, String> tags = new HashMap<>();
tags.put("tag1", "v1");
@@ -73,15 +89,43 @@ public class SessionExample {
session.createTimeseries("root.sg1.d1.s4", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY, null, tags, attributes, "temperature");
}
+ }
- insert();
- insertInBatch();
- insertRowBatch();
- nonQuery();
- query();
- deleteData();
- deleteTimeseries();
- session.close();
+ private static void createMultiTimeseries()
+ throws IoTDBConnectionException, BatchExecutionException {
+
+ List<String> paths = new ArrayList<>();
+ paths.add("root.sg1.d2.s1");
+ paths.add("root.sg1.d2.s2");
+ List<TSDataType> tsDataTypes = new ArrayList<>();
+ tsDataTypes.add(TSDataType.DOUBLE);
+ tsDataTypes.add(TSDataType.DOUBLE);
+ List<TSEncoding> tsEncodings = new ArrayList<>();
+ tsEncodings.add(TSEncoding.RLE);
+ tsEncodings.add(TSEncoding.RLE);
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ compressionTypes.add(CompressionType.SNAPPY);
+ compressionTypes.add(CompressionType.SNAPPY);
+
+ List<Map<String, String>> tagsList = new ArrayList<>();
+ Map<String, String> tags = new HashMap<>();
+ tags.put("unit", "kg");
+ tagsList.add(tags);
+ tagsList.add(tags);
+
+ List<Map<String, String>> attributesList = new ArrayList<>();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("minValue", "1");
+ attributes.put("maxValue", "100");
+ attributesList.add(attributes);
+ attributesList.add(attributes);
+
+ List<String> alias = new ArrayList<>();
+ alias.add("weight1");
+ alias.add("weight2");
+
+ session.createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList,
+ attributesList, alias);
}
private static void insert() throws IoTDBConnectionException, StatementExecutionException {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9b8f91f..d9a406e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.conf;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_ROOT;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
import org.apache.iotdb.db.exception.LoadConfigurationException;
@@ -43,6 +46,16 @@ public class IoTDBConfig {
private static final String MULTI_DIR_STRATEGY_PREFIX =
"org.apache.iotdb.db.conf.directories.strategy.";
private static final String DEFAULT_MULTI_DIR_STRATEGY = "MaxDiskUsableSpaceFirstStrategy";
+
+ private static final String NODE_MATCHER =
+ "[" + PATH_SEPARATOR + "]" + "([a-zA-Z0-9\u2E80-\u9FFF_]+)";
+
+ // for path like: root.sg1.d1."1.2.3" or root.sg1.d1.'1.2.3', only occurs in the end of the path and only occurs once
+ private static final String NODE_WITH_QUOTATION_MARK_MATCHER =
+ "[" + PATH_SEPARATOR + "][\"|\']([a-zA-Z0-9\u2E80-\u9FFF_]+)(" + NODE_MATCHER + ")+[\"|\']";
+ public static final Pattern PATH_PATTERN = Pattern
+ .compile(PATH_ROOT + "(" + NODE_MATCHER + ")+(" + NODE_WITH_QUOTATION_MARK_MATCHER + ")?");
+
/**
* Port which the metrics service listens to.
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index cfbfa83..9ec9e7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -78,6 +79,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static org.apache.iotdb.db.conf.IoTDBConfig.PATH_PATTERN;
import static org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES;
@@ -1183,8 +1185,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
+ TSStatus status = checkPathValidity(storageGroup);
+ if (status != null) {
+ return status;
+ }
+
SetStorageGroupPlan plan = new SetStorageGroupPlan(new Path(storageGroup));
- TSStatus status = checkAuthority(plan, sessionId);
+ status = checkAuthority(plan, sessionId);
if (status != null) {
return new TSStatus(status);
}
@@ -1211,23 +1218,78 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
- //TODO : init tags
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
}
- CreateTimeSeriesPlan plan =
- new CreateTimeSeriesPlan(
- new Path(req.getPath()),
- TSDataType.values()[req.getDataType()],
- TSEncoding.values()[req.getEncoding()],
- CompressionType.values()[req.compressor],
- req.props, req.tags, req.attributes, req.measurementAlias);
- TSStatus status = checkAuthority(plan, req.getSessionId());
+
+ TSStatus status = checkPathValidity(req.path);
if (status != null) {
- return new TSStatus(status);
+ return status;
+ }
+
+ CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new Path(req.path),
+ TSDataType.values()[req.dataType], TSEncoding.values()[req.encoding],
+ CompressionType.values()[req.compressor], req.props, req.tags, req.attributes,
+ req.measurementAlias);
+ status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ return status;
+ }
+ return executePlan(plan);
+ }
+
+ @Override
+ public TSExecuteBatchStatementResp createMultiTimeseries(TSCreateMultiTimeseriesReq req) {
+ if (!checkLogin(req.getSessionId())) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ List<TSStatus> statusList = new ArrayList<>(req.paths.size());
+ for (int i = 0; i < req.paths.size(); i++) {
+ CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new Path(req.getPaths().get(i)),
+ TSDataType.values()[req.dataTypes.get(i)], TSEncoding.values()[req.encodings.get(i)],
+ CompressionType.values()[req.compressors.get(i)],
+ req.propsList == null ? null : req.propsList.get(i),
+ req.tagsList == null ? null : req.tagsList.get(i),
+ req.attributesList == null ? null : req.attributesList.get(i),
+ req.measurementAliasList == null ? null : req.measurementAliasList.get(i));
+
+ TSStatus status = checkPathValidity(req.paths.get(i));
+ if (status != null) {
+ // path naming is not valid
+ statusList.add(status);
+ continue;
+ }
+
+ status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ // not authorized
+ statusList.add(status);
+ continue;
+ }
+
+ statusList.add(executePlan(plan));
+ }
+
+ boolean isAllSuccessful = true;
+ for (TSStatus tsStatus : statusList) {
+ if (tsStatus.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ isAllSuccessful = false;
+ break;
+ }
+ }
+
+ if (isAllSuccessful) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Create multiple timeseries successfully");
+ }
+ return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ logger.debug("Create multiple timeseries failed!");
+ return RpcUtils.getTSBatchExecuteStatementResp(statusList);
}
- return new TSStatus(executePlan(plan));
}
@Override
@@ -1243,9 +1305,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList);
TSStatus status = checkAuthority(plan, sessionId);
if (status != null) {
- return new TSStatus(status);
+ return status;
}
- return new TSStatus(executePlan(plan));
+ return executePlan(plan);
}
@Override
@@ -1284,6 +1346,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
+ private TSStatus checkPathValidity(String path) {
+ if (!PATH_PATTERN.matcher(path).matches()) {
+ return RpcUtils.getStatus(TSStatusCode.PATH_ILLEGAL);
+ } else {
+ return null;
+ }
+ }
+
public static List<SqlArgument> getSqlArgumentList() {
return sqlArgumentList;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sql/CheckPathValidityTest.java b/server/src/test/java/org/apache/iotdb/db/sql/CheckPathValidityTest.java
new file mode 100644
index 0000000..2204c3b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/sql/CheckPathValidityTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sql;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.junit.Test;
+
+public class CheckPathValidityTest {
+
+ @Test
+ public void testCheckPathValidity() {
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.vehicle").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.123456").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root._1234").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root._vehicle").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.1234a4").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.1_2").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.vehicle.1245.1.2.3").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.vehicle.1245.\"1.2.3\"").matches());
+ assertTrue(IoTDBConfig.PATH_PATTERN.matcher("root.vehicle.1245.\'1.2.3\'").matches());
+
+ assertFalse(IoTDBConfig.PATH_PATTERN.matcher("vehicle").matches());
+ assertFalse(IoTDBConfig.PATH_PATTERN.matcher("root.\tvehicle").matches());
+ assertFalse(IoTDBConfig.PATH_PATTERN.matcher("root.\nvehicle").matches());
+ assertFalse(IoTDBConfig.PATH_PATTERN.matcher("root..vehicle").matches());
+ assertFalse(IoTDBConfig.PATH_PATTERN.matcher("root.%12345").matches());
+ assertFalse(IoTDBConfig.PATH_PATTERN.matcher("root.a{12345}").matches());
+ }
+}
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 32f3859..5a15ea8 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -63,6 +63,8 @@ Last Updated on October 27th, 2019 by Lei Rui.
| Add method testInsertRowInBatch(1:TSInsertInBatchReq req); | Kaifeng Xue |
| Add method testInsertRow(1:TSInsertReq req); | Kaifeng Xue |
| Add method testInsertBatch(1:TSBatchInsertionReq req); | Kaifeng Xue |
+| Add struct TSCreateMultiTimeseriesReq | qiaojialin |
+| Add method createMultiTimeseries(1:TSCreateMultiTimeseriesReq req); | qiaojialin |
## 3. Update
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 55f5ea5..a0faa41 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -209,6 +209,18 @@ struct TSCreateTimeseriesReq {
9: optional string measurementAlias
}
+struct TSCreateMultiTimeseriesReq {
+ 1: required i64 sessionId
+ 2: required list<string> paths
+ 3: required list<i32> dataTypes
+ 4: required list<i32> encodings
+ 5: required list<i32> compressors
+ 6: optional list<map<string, string>> propsList
+ 7: optional list<map<string, string>> tagsList
+ 8: optional list<map<string, string>> attributesList
+ 9: optional list<string> measurementAliasList
+}
+
struct ServerProperties {
1: required string version;
2: required list<string> supportedTimeAggregationOperations;
@@ -263,6 +275,8 @@ service TSIService {
TSStatus createTimeseries(1:TSCreateTimeseriesReq req);
+ TSExecuteBatchStatementResp createMultiTimeseries(1:TSCreateMultiTimeseriesReq req);
+
TSStatus deleteTimeseries(1:i64 sessionId, 2:list<string> path)
TSStatus deleteStorageGroups(1:i64 sessionId, 2:list<string> storageGroup);
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index 03e5a26..325ae26 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -18,11 +18,6 @@
*/
package org.apache.iotdb.session;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_ROOT;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
-import java.util.regex.Pattern;
-
public class Config {
public static final String DEFAULT_USER = "user";
@@ -30,13 +25,4 @@ public class Config {
public static final int DEFAULT_FETCH_SIZE = 10000;
public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final String NODE_MATCHER =
- "[" + PATH_SEPARATOR + "]" + "([a-zA-Z0-9\u2E80-\u9FFF_]+)";
-
- // for path like: root.sg1.d1."1.2.3" or root.sg1.d1.'1.2.3', only occurs in the end of the path and only occurs once
- public static final String NODE_WITH_QUOTATION_MARK_MATCHER =
- "[" + PATH_SEPARATOR + "][\"|\']([a-zA-Z0-9\u2E80-\u9FFF_]+)(" + NODE_MATCHER + ")+[\"|\']";
- public static final Pattern PATH_PATTERN = Pattern
- .compile(PATH_ROOT + "(" + NODE_MATCHER + ")+(" + NODE_WITH_QUOTATION_MARK_MATCHER + ")?");
-
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index bb8ba86..9c51591 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.session;
-import static org.apache.iotdb.session.Config.PATH_PATTERN;
-
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,6 +30,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
@@ -562,7 +561,6 @@ public class Session {
public void setStorageGroup(String storageGroupId)
throws IoTDBConnectionException, StatementExecutionException {
- checkPathValidity(storageGroupId);
try {
RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroupId));
} catch (TException e) {
@@ -597,7 +595,6 @@ public class Session {
TSEncoding encoding, CompressionType compressor, Map<String, String> props,
Map<String, String> tags, Map<String, String> attributes, String measurementAlias)
throws IoTDBConnectionException, StatementExecutionException {
- checkPathValidity(path);
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
request.setSessionId(sessionId);
request.setPath(path);
@@ -616,9 +613,47 @@ public class Session {
}
}
- public boolean checkTimeseriesExists(String path)
- throws StatementExecutionException, IoTDBConnectionException {
- checkPathValidity(path);
+ public void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,
+ List<TSEncoding> encodings, List<CompressionType> compressors,
+ List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
+ List<Map<String, String>> attributesList, List<String> measurementAliasList)
+ throws IoTDBConnectionException, BatchExecutionException {
+
+ TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
+ request.setSessionId(sessionId);
+ request.setPaths(paths);
+
+ List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
+ for (TSDataType dataType: dataTypes) {
+ dataTypeOrdinals.add(dataType.ordinal());
+ }
+ request.setDataTypes(dataTypeOrdinals);
+
+ List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
+ for (TSEncoding encoding: encodings) {
+ encodingOrdinals.add(encoding.ordinal());
+ }
+ request.setEncodings(encodingOrdinals);
+
+ List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
+ for (CompressionType compression: compressors) {
+ compressionOrdinals.add(compression.ordinal());
+ }
+ request.setCompressors(compressionOrdinals);
+
+ request.setPropsList(propsList);
+ request.setTagsList(tagsList);
+ request.setAttributesList(attributesList);
+ request.setMeasurementAliasList(measurementAliasList);
+
+ try {
+ RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
+ } catch (TException e) {
+ throw new IoTDBConnectionException(e);
+ }
+ }
+
+ public boolean checkTimeseriesExists(String path) throws IoTDBConnectionException {
try {
return executeQueryStatement(String.format("SHOW TIMESERIES %s", path)).hasNext();
} catch (Exception e) {
@@ -714,9 +749,4 @@ public class Session {
}
}
- private void checkPathValidity(String path) throws StatementExecutionException {
- if (!PATH_PATTERN.matcher(path).matches()) {
- throw new StatementExecutionException(String.format("Path [%s] is invalid", path));
- }
- }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 42ab6e9..e87cb03 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -675,19 +675,23 @@ public class SessionPool {
String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
}
- public boolean checkTimeseriesExists(String path)
- throws IoTDBConnectionException, StatementExecutionException {
+ public void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,
+ List<TSEncoding> encodings, List<CompressionType> compressors,
+ List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
+ List<Map<String, String>> attributesList, List<String> measurementAliasList)
+ throws IoTDBConnectionException, BatchExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
- boolean resp = session.checkTimeseriesExists(path);
+ session.createMultiTimeseries(paths, dataTypes, encodings, compressors, propsList, tagsList,
+ attributesList, measurementAliasList);
putBack(session);
- return resp;
+ return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
closeSession(session);
removeSession();
- } catch (StatementExecutionException e) {
+ } catch (BatchExecutionException e) {
putBack(session);
throw e;
}
@@ -696,6 +700,23 @@ public class SessionPool {
String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
}
+ public boolean checkTimeseriesExists(String path) throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ boolean resp = session.checkTimeseriesExists(path);
+ putBack(session);
+ return resp;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
/**
* execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
* SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
diff --git a/session/src/test/java/org/apache/iotdb/session/CheckPathValidityTest.java b/session/src/test/java/org/apache/iotdb/session/CheckPathValidityTest.java
deleted file mode 100644
index 296bf32..0000000
--- a/session/src/test/java/org/apache/iotdb/session/CheckPathValidityTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.session;
-
-import static org.apache.iotdb.session.Config.PATH_PATTERN;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class CheckPathValidityTest {
-
- @Test
- public void testCheckPathValidity() {
- assertTrue(PATH_PATTERN.matcher("root.vehicle").matches());
- assertTrue(PATH_PATTERN.matcher("root.123456").matches());
- assertTrue(PATH_PATTERN.matcher("root._1234").matches());
- assertTrue(PATH_PATTERN.matcher("root._vehicle").matches());
- assertTrue(PATH_PATTERN.matcher("root.1234a4").matches());
- assertTrue(PATH_PATTERN.matcher("root.1_2").matches());
- assertTrue(PATH_PATTERN.matcher("root.vehicle.1245.1.2.3").matches());
- assertTrue(PATH_PATTERN.matcher("root.vehicle.1245.\"1.2.3\"").matches());
- assertTrue(PATH_PATTERN.matcher("root.vehicle.1245.\'1.2.3\'").matches());
-
- assertFalse(PATH_PATTERN.matcher("vehicle").matches());
- assertFalse(PATH_PATTERN.matcher("root.\tvehicle").matches());
- assertFalse(PATH_PATTERN.matcher("root.\nvehicle").matches());
- assertFalse(PATH_PATTERN.matcher("root..vehicle").matches());
- assertFalse(PATH_PATTERN.matcher("root.%12345").matches());
- assertFalse(PATH_PATTERN.matcher("root.a{12345}").matches());
- }
-}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index f4577b2..fa0d79c 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -30,7 +30,9 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -73,8 +75,7 @@ public class IoTDBSessionIT {
}
@Test
- public void testInsertByObject()
- throws IoTDBConnectionException, SQLException, ClassNotFoundException, StatementExecutionException {
+ public void testInsertByObject() throws IoTDBConnectionException, StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
@@ -91,6 +92,7 @@ public class IoTDBSessionIT {
}
+ @Test
public void testAlignByDevice() throws IoTDBConnectionException,
StatementExecutionException, BatchExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
@@ -158,6 +160,40 @@ public class IoTDBSessionIT {
queryForBatch();
}
+ @Test
+ public void testCreateMultiTimeseries() throws IoTDBConnectionException, BatchExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+
+ List<String> paths = new ArrayList<>();
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d1.s2");
+ List<TSDataType> tsDataTypes = new ArrayList<>();
+ tsDataTypes.add(TSDataType.DOUBLE);
+ tsDataTypes.add(TSDataType.DOUBLE);
+ List<TSEncoding> tsEncodings = new ArrayList<>();
+ tsEncodings.add(TSEncoding.RLE);
+ tsEncodings.add(TSEncoding.RLE);
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ compressionTypes.add(CompressionType.SNAPPY);
+ compressionTypes.add(CompressionType.SNAPPY);
+
+ List<Map<String, String>> tagsList = new ArrayList<>();
+ Map<String, String> tags = new HashMap<>();
+ tags.put("tag1", "v1");
+ tagsList.add(tags);
+ tagsList.add(tags);
+
+ session
+ .createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList,
+ null, null);
+
+ Assert.assertTrue(session.checkTimeseriesExists("root.sg1.d1.s1"));
+ Assert.assertTrue(session.checkTimeseriesExists("root.sg1.d1.s2"));
+
+ }
+
+ @Test
public void testTestMethod()
throws StatementExecutionException, IoTDBConnectionException, BatchExecutionException {
@@ -368,7 +404,6 @@ public class IoTDBSessionIT {
private void insertInChinese(String storageGroup, String[] devices)
throws StatementExecutionException, IoTDBConnectionException {
for (String path : devices) {
- String fullPath = storageGroup + "." + path;
for (int i = 0; i < 10; i++) {
String[] ss = path.split("\\.");
String deviceId = storageGroup;