You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/10 05:50:17 UTC

[iotdb] branch master updated: Support create sg and timeseries by session API in new cluster (#5836)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0604e5057d Support create sg and timeseries by session API in new cluster (#5836)
0604e5057d is described below

commit 0604e5057d18a56b4624492edec422b04c6a451f
Author: Haonan <hh...@outlook.com>
AuthorDate: Tue May 10 13:50:12 2022 +0800

    Support create sg and timeseries by session API in new cluster (#5836)
---
 .../db/mpp/plan/parser/StatementGenerator.java     |  38 +++++-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 127 ++++++++++++++++++++-
 2 files changed, 161 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5a4eeb0153..51726de6ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -36,12 +36,15 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.SqlLexer;
 import org.apache.iotdb.db.qp.strategy.SQLParseError;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -323,8 +326,15 @@ public class StatementGenerator {
     return insertStatement;
   }
 
+  public static Statement createStatement(String storageGroup) throws IllegalPathException {
+    // construct set storage group statement
+    SetStorageGroupStatement statement = new SetStorageGroupStatement();
+    statement.setStorageGroupPath(new PartialPath(storageGroup));
+    return statement;
+  }
+
   public static Statement createStatement(TSCreateTimeseriesReq req) throws IllegalPathException {
-    // construct create timseries statement
+    // construct create timeseries statement
     CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
     statement.setPath(new PartialPath(req.path));
     statement.setDataType(TSDataType.values()[req.dataType]);
@@ -337,6 +347,32 @@ public class StatementGenerator {
     return statement;
   }
 
+  public static Statement createStatement(TSCreateAlignedTimeseriesReq req)
+      throws IllegalPathException {
+    // construct create aligned timeseries statement
+    CreateAlignedTimeSeriesStatement statement = new CreateAlignedTimeSeriesStatement();
+    statement.setDevicePath(new PartialPath(req.prefixPath));
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (int dataType : req.dataTypes) {
+      dataTypes.add(TSDataType.values()[dataType]);
+    }
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (int encoding : req.encodings) {
+      encodings.add(TSEncoding.values()[encoding]);
+    }
+    List<CompressionType> compressors = new ArrayList<>();
+    for (int compressor : req.compressors) {
+      compressors.add(CompressionType.values()[compressor]);
+    }
+    statement.setDataTypes(dataTypes);
+    statement.setEncodings(encodings);
+    statement.setCompressors(compressors);
+    statement.setTagsList(req.tagsList);
+    statement.setAttributesList(req.attributesList);
+    statement.setAliasList(req.measurementAlias);
+    return statement;
+  }
+
   private static Statement invokeParser(String sql, ZoneId zoneId) {
     ASTVisitor astVisitor = new ASTVisitor();
     astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 65f3264b6d..c9635fced0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -42,6 +42,9 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.SessionTimeoutManager;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -261,17 +264,135 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
-    throw new UnsupportedOperationException();
+    try {
+      if (!SESSION_MANAGER.checkLogin(sessionId)) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session-{} create storage group {}", SESSION_MANAGER.getCurrSessionId(), storageGroup);
+      }
+
+      // Step 1: Create SetStorageGroupStatement
+      SetStorageGroupStatement statement =
+          (SetStorageGroupStatement) StatementGenerator.createStatement(storageGroup);
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
   }
 
   @Override
   public TSStatus createTimeseries(TSCreateTimeseriesReq req) {
-    throw new UnsupportedOperationException();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath());
+      }
+
+      // Step 1: transfer from TSCreateTimeseriesReq to Statement
+      CreateTimeSeriesStatement statement =
+          (CreateTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
   }
 
   @Override
   public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) {
-    throw new UnsupportedOperationException();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // if measurements.size() == 1, convert to create timeseries
+      if (req.measurements.size() == 1) {
+        return createTimeseries(
+            new TSCreateTimeseriesReq(
+                req.sessionId,
+                req.prefixPath + "." + req.measurements.get(0),
+                req.dataTypes.get(0),
+                req.encodings.get(0),
+                req.compressors.get(0)));
+      }
+
+      if (AUDIT_LOGGER.isDebugEnabled()) {
+        AUDIT_LOGGER.debug(
+            "Session-{} create aligned timeseries {}.{}",
+            SESSION_MANAGER.getCurrSessionId(),
+            req.getPrefixPath(),
+            req.getMeasurements());
+      }
+
+      // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement
+      CreateAlignedTimeSeriesStatement statement =
+          (CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req);
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER);
+
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
   }
 
   @Override