You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/14 01:52:03 UTC
[iotdb] 08/11: add model management executor (tmp save)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/MLSQL
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 720bc62ac089820abfdedf55ee00b082db5d3897
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Feb 28 15:58:48 2023 +0800
add model management executor (tmp save)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 3 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 +
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 16 ++--
.../config/executor/ClusterConfigTaskExecutor.java | 96 +++++++++++++++++++++-
.../config/metadata/model/ShowModelsTask.java | 7 ++
.../config/metadata/model/ShowTrailsTask.java | 7 ++
.../metadata/model/CreateModelStatement.java | 2 +
7 files changed, 122 insertions(+), 13 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index e727c4566b..109a5b3bbd 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -452,7 +452,8 @@ migrateRegion
// ---- Create Model
createModel
: CREATE AUTO? MODEL modelId=identifier
- WITH attributePair (COMMA attributePair)*
+ WITH MODEL_TASK operator_eq modelTask=attributeValue
+ (COMMA attributePair)*
BEGIN
selectStatement
END
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index f06d70e318..2ddab5fd35 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -394,6 +394,10 @@ MODELS
: M O D E L S
;
+MODEL_TASK
+ : M O D E L '_' T A S K
+ ;
+
NODEID
: N O D E I D
;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index c9c71b85f6..99ebd6fb70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -54,14 +54,14 @@ public class Analyzer {
}
public static void validate(Statement statement) {
- MPPQueryContext context = new MPPQueryContext(mockQueryId);
-
- IPartitionFetcher partitionFetcher;
- ISchemaFetcher schemaFetcher;
- partitionFetcher = ClusterPartitionFetcher.getInstance();
- schemaFetcher = ClusterSchemaFetcher.getInstance();
-
- Analyzer analyzer = new Analyzer(context, partitionFetcher, schemaFetcher);
+ Analyzer analyzer = getAnalyzer();
analyzer.analyze(statement);
}
+
+ public static Analyzer getAnalyzer() {
+ return new Analyzer(
+ new MPPQueryContext(mockQueryId),
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 6c6a07e313..8730967fd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -47,6 +48,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
@@ -67,11 +69,15 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
@@ -84,6 +90,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask;
@@ -101,11 +108,14 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTriggersTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowVariablesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.ShowModelsTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.ShowTrailsTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
@@ -169,6 +179,7 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -1638,21 +1649,98 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createModel(CreateModelStatement createModelStatement) {
- return null;
+ createModelStatement.semanticCheck();
+
+ Analyzer analyzer = Analyzer.getAnalyzer();
+ Analysis analysis = analyzer.analyze(createModelStatement.getQueryStatement());
+
+ List<String> queryExpressions = new ArrayList<>();
+ for (Expression expression : analysis.getSelectExpressions()) {
+ queryExpressions.add(expression.toString());
+ }
+ String queryFilter = analysis.getWhereExpression().toString();
+
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ TCreateModelReq createModelReq = new TCreateModelReq();
+ createModelReq.setModelId(createModelStatement.getModelId());
+ createModelReq.setQueryExpressions(queryExpressions);
+ createModelReq.setQueryFilter(queryFilter);
+ createModelReq.setIsAuto(createModelStatement.isAuto());
+ createModelReq.setModelConfigs(createModelStatement.getAttributes());
+ final TSStatus executionStatus = client.createModel(createModelReq);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.warn(
+ "[{}] Failed to create model {}. TSStatus is {}",
+ executionStatus,
+ createModelStatement.getModelId(),
+ executionStatus.message);
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
}
@Override
public SettableFuture<ConfigTaskResult> dropModel(String modelId) {
- return null;
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ final TSStatus executionStatus = client.dropModel(new TDropModelReq(modelId));
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.warn("[{}] Failed to drop model {}.", executionStatus, modelId);
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
}
@Override
public SettableFuture<ConfigTaskResult> showModels() {
- return null;
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ TShowModelResp showModelResp = client.showModel(new TShowModelReq());
+ if (showModelResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(
+ new IoTDBException(showModelResp.getStatus().message, showModelResp.getStatus().code));
+ return future;
+ }
+ // convert model info list and buildTsBlock
+ ShowModelsTask.buildTsBlock(showModelResp.getModelInfoList(), future);
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+
+ return future;
}
@Override
public SettableFuture<ConfigTaskResult> showTrails(String modelId) {
- return null;
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ TShowTrailResp showTrailResp = client.showTrail(new TShowTrailReq(modelId));
+ if (showTrailResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(
+ new IoTDBException(showTrailResp.getStatus().message, showTrailResp.getStatus().code));
+ return future;
+ }
+ // convert trail info list and buildTsBlock
+ ShowTrailsTask.buildTsBlock(showTrailResp.getTrailInfoList(), future);
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+
+ return future;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
index 10b7a41ce0..7f7a3d1db1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.nio.ByteBuffer;
+import java.util.List;
public class ShowModelsTask implements IConfigTask {
@@ -34,4 +38,7 @@ public class ShowModelsTask implements IConfigTask {
throws InterruptedException {
return configTaskExecutor.showModels();
}
+
+ public static void buildTsBlock(
+ List<ByteBuffer> modelInfoList, SettableFuture<ConfigTaskResult> future) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
index f4c9c6388f..948a8e91fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.nio.ByteBuffer;
+import java.util.List;
public class ShowTrailsTask implements IConfigTask {
@@ -38,4 +42,7 @@ public class ShowTrailsTask implements IConfigTask {
throws InterruptedException {
return configTaskExecutor.showTrails(modelId);
}
+
+ public static void buildTsBlock(
+ List<ByteBuffer> trailInfoList, SettableFuture<ConfigTaskResult> future) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java
index cc91a2f027..11b552c903 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/model/CreateModelStatement.java
@@ -71,6 +71,8 @@ public class CreateModelStatement extends Statement implements IConfigStatement
this.queryStatement = queryStatement;
}
+ public void semanticCheck() {}
+
@Override
public List<? extends PartialPath> getPaths() {
return Collections.emptyList();