You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/14 01:52:03 UTC

[iotdb] 08/11: add model management executor (tmp save)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/MLSQL
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 720bc62ac089820abfdedf55ee00b082db5d3897
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Feb 28 15:58:48 2023 +0800

    add model management executor (tmp save)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  3 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  4 +
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 16 ++--
 .../config/executor/ClusterConfigTaskExecutor.java | 96 +++++++++++++++++++++-
 .../config/metadata/model/ShowModelsTask.java      |  7 ++
 .../config/metadata/model/ShowTrailsTask.java      |  7 ++
 .../metadata/model/CreateModelStatement.java       |  2 +
 7 files changed, 122 insertions(+), 13 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index e727c4566b..109a5b3bbd 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -452,7 +452,8 @@ migrateRegion
 // ---- Create Model
 createModel
     : CREATE AUTO? MODEL modelId=identifier
-        WITH attributePair (COMMA attributePair)*
+        WITH MODEL_TASK operator_eq modelTask=attributeValue
+            (COMMA attributePair)*
         BEGIN
             selectStatement
         END
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index f06d70e318..2ddab5fd35 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -394,6 +394,10 @@ MODELS
     : M O D E L S
     ;
 
+MODEL_TASK
+    : M O D E L '_' T A S K
+    ;
+
 NODEID
     : N O D E I D
     ;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index c9c71b85f6..99ebd6fb70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -54,14 +54,14 @@ public class Analyzer {
   }
 
   public static void validate(Statement statement) {
-    MPPQueryContext context = new MPPQueryContext(mockQueryId);
-
-    IPartitionFetcher partitionFetcher;
-    ISchemaFetcher schemaFetcher;
-    partitionFetcher = ClusterPartitionFetcher.getInstance();
-    schemaFetcher = ClusterSchemaFetcher.getInstance();
-
-    Analyzer analyzer = new Analyzer(context, partitionFetcher, schemaFetcher);
+    Analyzer analyzer = getAnalyzer();
     analyzer.analyze(statement);
   }
+
+  public static Analyzer getAnalyzer() {
+    return new Analyzer(
+        new MPPQueryContext(mockQueryId),
+        ClusterPartitionFetcher.getInstance(),
+        ClusterSchemaFetcher.getInstance());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 6c6a07e313..8730967fd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -47,6 +48,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
@@ -67,11 +69,15 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
@@ -84,6 +90,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask;
@@ -101,11 +108,14 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTriggersTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowVariablesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.ShowModelsTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.ShowTrailsTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
@@ -169,6 +179,7 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1638,21 +1649,98 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> createModel(CreateModelStatement createModelStatement) {
-    return null;
+    createModelStatement.semanticCheck();
+
+    Analyzer analyzer = Analyzer.getAnalyzer();
+    Analysis analysis = analyzer.analyze(createModelStatement.getQueryStatement());
+
+    List<String> queryExpressions = new ArrayList<>();
+    for (Expression expression : analysis.getSelectExpressions()) {
+      queryExpressions.add(expression.toString());
+    }
+    String queryFilter = analysis.getWhereExpression().toString();
+
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      TCreateModelReq createModelReq = new TCreateModelReq();
+      createModelReq.setModelId(createModelStatement.getModelId());
+      createModelReq.setQueryExpressions(queryExpressions);
+      createModelReq.setQueryFilter(queryFilter);
+      createModelReq.setIsAuto(createModelStatement.isAuto());
+      createModelReq.setModelConfigs(createModelStatement.getAttributes());
+      final TSStatus executionStatus = client.createModel(createModelReq);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.warn(
+            "[{}] Failed to create model {}. TSStatus is {}",
+            executionStatus,
+            createModelStatement.getModelId(),
+            executionStatus.message);
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+    return future;
   }
 
   @Override
   public SettableFuture<ConfigTaskResult> dropModel(String modelId) {
-    return null;
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      final TSStatus executionStatus = client.dropModel(new TDropModelReq(modelId));
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.warn("[{}] Failed to drop model {}.", executionStatus, modelId);
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+    return future;
   }
 
   @Override
   public SettableFuture<ConfigTaskResult> showModels() {
-    return null;
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      TShowModelResp showModelResp = client.showModel(new TShowModelReq());
+      if (showModelResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        future.setException(
+            new IoTDBException(showModelResp.getStatus().message, showModelResp.getStatus().code));
+        return future;
+      }
+      // convert model info list and buildTsBlock
+      ShowModelsTask.buildTsBlock(showModelResp.getModelInfoList(), future);
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+
+    return future;
   }
 
   @Override
   public SettableFuture<ConfigTaskResult> showTrails(String modelId) {
-    return null;
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      TShowTrailResp showTrailResp = client.showTrail(new TShowTrailReq(modelId));
+      if (showTrailResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        future.setException(
+            new IoTDBException(showTrailResp.getStatus().message, showTrailResp.getStatus().code));
+        return future;
+      }
+      // convert trail info list and buildTsBlock
+      ShowTrailsTask.buildTsBlock(showTrailResp.getTrailInfoList(), future);
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+
+    return future;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
index 10b7a41ce0..7f7a3d1db1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.nio.ByteBuffer;
+import java.util.List;
 
 public class ShowModelsTask implements IConfigTask {
 
@@ -34,4 +38,7 @@ public class ShowModelsTask implements IConfigTask {
       throws InterruptedException {
     return configTaskExecutor.showModels();
   }
+
+  public static void buildTsBlock(
+      List<ByteBuffer> modelInfoList, SettableFuture<ConfigTaskResult> future) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
index f4c9c6388f..948a8e91fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.nio.ByteBuffer;
+import java.util.List;
 
 public class ShowTrailsTask implements IConfigTask {
 
@@ -38,4 +42,7 @@ public class ShowTrailsTask implements IConfigTask {
       throws InterruptedException {
     return configTaskExecutor.showTrails(modelId);
   }
+
+  public static void buildTsBlock(
+      List<ByteBuffer> trailInfoList, SettableFuture<ConfigTaskResult> future) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java
index cc91a2f027..11b552c903 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java
@@ -71,6 +71,8 @@ public class CreateModelStatement extends Statement implements IConfigStatement
     this.queryStatement = queryStatement;
   }
 
+  public void semanticCheck() {}
+
   @Override
   public List<? extends PartialPath> getPaths() {
     return Collections.emptyList();