You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/10 05:50:17 UTC
[iotdb] branch master updated: Support create sg and timeseries by session API in new cluster (#5836)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0604e5057d Support create sg and timeseries by session API in new cluster (#5836)
0604e5057d is described below
commit 0604e5057d18a56b4624492edec422b04c6a451f
Author: Haonan <hh...@outlook.com>
AuthorDate: Tue May 10 13:50:12 2022 +0800
Support create sg and timeseries by session API in new cluster (#5836)
---
.../db/mpp/plan/parser/StatementGenerator.java | 38 +++++-
.../thrift/impl/DataNodeTSIServiceImpl.java | 127 ++++++++++++++++++++-
2 files changed, 161 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5a4eeb0153..51726de6ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -36,12 +36,15 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.qp.sql.SqlLexer;
import org.apache.iotdb.db.qp.strategy.SQLParseError;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -323,8 +326,15 @@ public class StatementGenerator {
return insertStatement;
}
+ public static Statement createStatement(String storageGroup) throws IllegalPathException {
+ // construct set storage group statement
+ SetStorageGroupStatement statement = new SetStorageGroupStatement();
+ statement.setStorageGroupPath(new PartialPath(storageGroup));
+ return statement;
+ }
+
public static Statement createStatement(TSCreateTimeseriesReq req) throws IllegalPathException {
- // construct create timseries statement
+ // construct create timeseries statement
CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
statement.setPath(new PartialPath(req.path));
statement.setDataType(TSDataType.values()[req.dataType]);
@@ -337,6 +347,32 @@ public class StatementGenerator {
return statement;
}
+ public static Statement createStatement(TSCreateAlignedTimeseriesReq req)
+ throws IllegalPathException {
+ // construct create aligned timeseries statement
+ CreateAlignedTimeSeriesStatement statement = new CreateAlignedTimeSeriesStatement();
+ statement.setDevicePath(new PartialPath(req.prefixPath));
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (int dataType : req.dataTypes) {
+ dataTypes.add(TSDataType.values()[dataType]);
+ }
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int encoding : req.encodings) {
+ encodings.add(TSEncoding.values()[encoding]);
+ }
+ List<CompressionType> compressors = new ArrayList<>();
+ for (int compressor : req.compressors) {
+ compressors.add(CompressionType.values()[compressor]);
+ }
+ statement.setDataTypes(dataTypes);
+ statement.setEncodings(encodings);
+ statement.setCompressors(compressors);
+ statement.setTagsList(req.tagsList);
+ statement.setAttributesList(req.attributesList);
+ statement.setAliasList(req.measurementAlias);
+ return statement;
+ }
+
private static Statement invokeParser(String sql, ZoneId zoneId) {
ASTVisitor astVisitor = new ASTVisitor();
astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 65f3264b6d..c9635fced0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -42,6 +42,9 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.SessionTimeoutManager;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -261,17 +264,135 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
@Override
public TSStatus setStorageGroup(long sessionId, String storageGroup) {
- throw new UnsupportedOperationException();
+ try {
+ if (!SESSION_MANAGER.checkLogin(sessionId)) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} create storage group {}", SESSION_MANAGER.getCurrSessionId(), storageGroup);
+ }
+
+ // Step 1: Create SetStorageGroupStatement
+ SetStorageGroupStatement statement =
+ (SetStorageGroupStatement) StatementGenerator.createStatement(storageGroup);
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
}
@Override
public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
- throw new UnsupportedOperationException();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+ }
+
+ // Step 1: transfer from TSCreateTimeseriesReq to Statement
+ CreateTimeSeriesStatement statement =
+ (CreateTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
}
@Override
public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
- throw new UnsupportedOperationException();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // if measurements.size() == 1, convert to create timeseries
+ if (req.measurements.size() == 1) {
+ return createTimeseries(
+ new TSCreateTimeseriesReq(
+ req.sessionId,
+ req.prefixPath + "." + req.measurements.get(0),
+ req.dataTypes.get(0),
+ req.encodings.get(0),
+ req.compressors.get(0)));
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} create aligned timeseries {}.{}",
+ SESSION_MANAGER.getCurrSessionId(),
+ req.getPrefixPath(),
+ req.getMeasurements());
+ }
+
+ // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement
+ CreateAlignedTimeSeriesStatement statement =
+ (CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
}
@Override