You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/02 03:10:13 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5399] Implement batch auto create schema (#8936)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new bce665cd8b [To rel/1.0][IOTDB-5399] Implement batch auto create schema (#8936)
bce665cd8b is described below
commit bce665cd8b36807f05341d700c604b3bb7892c53
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu Feb 2 11:10:07 2023 +0800
[To rel/1.0][IOTDB-5399] Implement batch auto create schema (#8936)
---
.../iotdb/db/client/DataNodeInternalClient.java | 4 +-
.../metadata/visitor/SchemaExecutionVisitor.java | 61 +++
.../execution/executor/RegionWriteExecutor.java | 139 ++++-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 2 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 44 ++
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 2 +
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 599 ---------------------
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 1 +
.../analyze/schema/AutoCreateSchemaExecutor.java | 418 ++++++++++++++
.../analyze/schema/ClusterSchemaFetchExecutor.java | 211 ++++++++
.../plan/analyze/schema/ClusterSchemaFetcher.java | 363 +++++++++++++
.../plan/analyze/{ => schema}/ISchemaFetcher.java | 2 +-
.../plan/analyze/{ => schema}/SchemaValidator.java | 3 +-
.../iotdb/db/mpp/plan/constant/StatementType.java | 3 +
.../db/mpp/plan/execution/QueryExecution.java | 2 +-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 32 ++
.../mpp/plan/planner/plan/node/PlanNodeType.java | 10 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +
.../write/InternalBatchActivateTemplateNode.java | 162 ++++++
.../write/InternalCreateMultiTimeSeriesNode.java | 164 ++++++
.../db/mpp/plan/scheduler/StandaloneScheduler.java | 2 +-
.../db/mpp/plan/statement/StatementVisitor.java | 12 +
.../InternalBatchActivateTemplateStatement.java | 59 ++
.../InternalCreateMultiTimeSeriesStatement.java | 57 ++
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 4 +-
.../protocol/rest/impl/GrafanaApiServiceImpl.java | 4 +-
.../db/protocol/rest/impl/RestApiServiceImpl.java | 4 +-
.../metrics/IoTDBInternalLocalReporter.java | 4 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 4 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 4 +-
.../java/org/apache/iotdb/db/sync/SyncService.java | 2 +-
.../iotdb/db/sync/pipedata/load/ILoader.java | 4 +-
.../db/sync/transport/server/ReceiverManager.java | 2 +-
.../db/wal/recover/file/TsFilePlanRedoer.java | 2 +-
.../db/mpp/plan/StandaloneCoordinatorTest.java | 2 +-
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 1 +
.../iotdb/db/mpp/plan/plan/distribution/Util.java | 2 +-
37 files changed, 1774 insertions(+), 627 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 419af63350..ecab514495 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -29,11 +29,11 @@ import org.apache.iotdb.db.exception.IntoProcessException;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 075e48db48..ca426250d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanFactory;
import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.write.ICreateTimeSeriesPlan;
@@ -41,6 +42,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMulti
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
@@ -51,6 +54,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -162,6 +166,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
+ @Override
+ public TSStatus visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ PartialPath devicePath;
+ MeasurementGroup measurementGroup;
+
+ List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
+ List<TSStatus> failingStatus = new ArrayList<>();
+
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
+ node.getDeviceMap().entrySet()) {
+ devicePath = deviceEntry.getKey();
+ measurementGroup = deviceEntry.getValue().right;
+ if (deviceEntry.getValue().left) {
+ executeInternalCreateAlignedTimeseries(
+ devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
+ } else {
+ executeInternalCreateTimeseries(
+ devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
+ }
+ }
+
+ if (!failingStatus.isEmpty()) {
+ return RpcUtils.getStatus(failingStatus);
+ }
+
+ if (!alreadyExistingTimeseries.isEmpty()) {
+ return RpcUtils.getStatus(alreadyExistingTimeseries);
+ }
+
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+ }
+
private void executeInternalCreateTimeseries(
PartialPath devicePath,
MeasurementGroup measurementGroup,
@@ -289,6 +326,30 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
}
}
+ @Override
+ public TSStatus visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
+ for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+ node.getTemplateActivationMap().entrySet()) {
+ Template template = ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
+ try {
+ schemaRegion.activateSchemaTemplate(
+ SchemaRegionPlanFactory.getActivateTemplateInClusterPlan(
+ entry.getKey(), entry.getValue().right, entry.getValue().left),
+ template);
+ } catch (TemplateIsInUseException e) {
+ logger.info(
+ String.format(
+ "Schema template has already been activated on path %s, there's no need to activate again.",
+ entry.getKey()));
+ } catch (MetadataException e) {
+ logger.error(e.getMessage(), e);
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
@Override
public TSStatus visitConstructSchemaBlackList(
ConstructSchemaBlackListNode node, ISchemaRegion schemaRegion) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c1f46626e7..b237707483 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -39,13 +39,15 @@ import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
@@ -67,6 +69,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -501,6 +504,107 @@ public class RegionWriteExecutor {
}
}
+ @Override
+ public RegionExecutionResult visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) context.getRegionId());
+ if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ context.getRegionWriteValidationRWLock().writeLock().lock();
+ try {
+ List<TSStatus> failingStatus = new ArrayList<>();
+ List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+
+ MeasurementGroup measurementGroup;
+ Map<Integer, MetadataException> failingMeasurementMap;
+ MetadataException metadataException;
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
+ node.getDeviceMap().entrySet()) {
+ measurementGroup = deviceEntry.getValue().right;
+ failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ deviceEntry.getKey(),
+ measurementGroup.getMeasurements(),
+ measurementGroup.getAliasList());
+ // filter failed measurement and keep the rest for execution
+ for (Map.Entry<Integer, MetadataException> failingMeasurement :
+ failingMeasurementMap.entrySet()) {
+ metadataException = failingMeasurement.getValue();
+ if (metadataException.getErrorCode()
+ == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ LOGGER.info(
+ "There's no need to internal create timeseries. {}",
+ failingMeasurement.getValue().getMessage());
+ alreadyExistingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
+ MeasurementPath.transformDataToString(
+ ((MeasurementAlreadyExistException) metadataException)
+ .getMeasurementPath())));
+ } else {
+ LOGGER.error("Metadata error: ", metadataException);
+ failingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(), metadataException.getMessage()));
+ }
+ }
+ measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
+ }
+
+ RegionExecutionResult executionResult =
+ super.visitInternalCreateMultiTimeSeries(node, context);
+
+ if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+ return executionResult;
+ }
+
+ TSStatus executionStatus = executionResult.getStatus();
+
+ // separate the measurement_already_exist exception and other exceptions process,
+ // measurement_already_exist exception is acceptable due to concurrent timeseries creation
+ if (failingStatus.isEmpty()) {
+ if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (executionStatus.getSubStatus().get(0).getCode()
+ == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ // there's only measurement_already_exist exception
+ alreadyExistingStatus.addAll(executionStatus.getSubStatus());
+ } else {
+ failingStatus.addAll(executionStatus.getSubStatus());
+ }
+ } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ } else {
+ if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (executionStatus.getSubStatus().get(0).getCode()
+ != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ failingStatus.addAll(executionStatus.getSubStatus());
+ }
+ } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ }
+
+ TSStatus status;
+ if (failingStatus.isEmpty()) {
+ status = RpcUtils.getStatus(alreadyExistingStatus);
+ } else {
+ status = RpcUtils.getStatus(failingStatus);
+ }
+
+ RegionExecutionResult result = new RegionExecutionResult();
+ result.setAccepted(false);
+ result.setMessage(status.getMessage());
+ result.setStatus(status);
+ return result;
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ } else {
+ return super.visitInternalCreateMultiTimeSeries(node, context);
+ }
+ }
+
@Override
public RegionExecutionResult visitActivateTemplate(
ActivateTemplateNode node, WritePlanNodeExecutionContext context) {
@@ -527,6 +631,39 @@ public class RegionWriteExecutor {
context.getRegionWriteValidationRWLock().readLock().unlock();
}
}
+
+ @Override
+ public RegionExecutionResult visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext context) {
+ // activate template operation shall be blocked by unset template check
+ context.getRegionWriteValidationRWLock().readLock().lock();
+ try {
+ for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+ node.getTemplateActivationMap().entrySet()) {
+ Pair<Template, PartialPath> templateSetInfo =
+ ClusterTemplateManager.getInstance().checkTemplateSetInfo(entry.getKey());
+ if (templateSetInfo == null) {
+ // The activation has already been validated during analyzing.
+ // That means the template is being unset during the activation plan transport.
+ RegionExecutionResult result = new RegionExecutionResult();
+ result.setAccepted(false);
+ String message =
+ String.format(
+ "Template is being unsetting from prefix path of %s. Please try activating later.",
+ new PartialPath(
+ Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1))
+ .getFullPath());
+ result.setMessage(message);
+ result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
+ return result;
+ }
+ }
+
+ return super.visitInternalBatchActivateTemplate(node, context);
+ } finally {
+ context.getRegionWriteValidationRWLock().readLock().unlock();
+ }
+ }
}
private static class WritePlanNodeExecutionContext {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 8d18d611c1..c9706cf049 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 637dfe73e2..42c7aedcc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -47,6 +47,8 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
@@ -77,6 +79,8 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -1603,6 +1607,26 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
+ @Override
+ public Analysis visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
+ MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+
+ Analysis analysis = new Analysis();
+ analysis.setStatement(internalCreateMultiTimeSeriesStatement);
+
+ PathPatternTree pathPatternTree = new PathPatternTree();
+ for (PartialPath devicePath : internalCreateMultiTimeSeriesStatement.getDeviceMap().keySet()) {
+ pathPatternTree.appendFullPath(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD));
+ }
+
+ SchemaPartition schemaPartitionInfo;
+ schemaPartitionInfo = partitionFetcher.getOrCreateSchemaPartition(pathPatternTree);
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+ return analysis;
+ }
+
@Override
public Analysis visitCreateMultiTimeseries(
CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
@@ -2490,6 +2514,26 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
+ @Override
+ public Analysis visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateStatement internalBatchActivateTemplateStatement,
+ MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(internalBatchActivateTemplateStatement);
+
+ PathPatternTree patternTree = new PathPatternTree();
+ for (PartialPath activatePath :
+ internalBatchActivateTemplateStatement.getDeviceMap().keySet()) {
+ patternTree.appendPathPattern(activatePath.concatNode(ONE_LEVEL_PATH_WILDCARD));
+ }
+ SchemaPartition partition = partitionFetcher.getOrCreateSchemaPartition(patternTree);
+
+ analysis.setSchemaPartitionInfo(partition);
+
+ return analysis;
+ }
+
@Override
public Analysis visitShowPathsUsingTemplate(
ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 010b491e55..40d032b3dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
deleted file mode 100644
index 16a49ae47a..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.analyze;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
-import org.apache.iotdb.db.metadata.template.ITemplateManager;
-import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
-import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
-public class ClusterSchemaFetcher implements ISchemaFetcher {
-
- private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- private final Coordinator coordinator = Coordinator.getInstance();
- private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
- private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
-
- private static final class ClusterSchemaFetcherHolder {
- private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
-
- private ClusterSchemaFetcherHolder() {}
- }
-
- public static ClusterSchemaFetcher getInstance() {
- return ClusterSchemaFetcherHolder.INSTANCE;
- }
-
- private ClusterSchemaFetcher() {}
-
- @Override
- public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
- return fetchSchema(patternTree, false);
- }
-
- @Override
- public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
- return fetchSchema(patternTree, true);
- }
-
- private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
- Map<Integer, Template> templateMap = new HashMap<>();
- patternTree.constructTree();
- List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
- for (PartialPath pattern : pathPatternList) {
- templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
- }
-
- if (withTags) {
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
- }
-
- List<PartialPath> fullPathList = new ArrayList<>();
- for (PartialPath pattern : pathPatternList) {
- if (!pattern.hasWildcard()) {
- fullPathList.add(pattern);
- }
- }
-
- if (fullPathList.isEmpty()) {
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
- }
-
- // The schema cache R/W and fetch operation must be locked together thus the cache clean
- // operation executed by delete timeseries will be effective.
- schemaCache.takeReadLock();
- try {
- ClusterSchemaTree schemaTree;
- if (fullPathList.size() == pathPatternList.size()) {
- boolean isAllCached = true;
- schemaTree = new ClusterSchemaTree();
- ClusterSchemaTree cachedSchema;
- Set<String> storageGroupSet = new HashSet<>();
- for (PartialPath fullPath : fullPathList) {
- cachedSchema = schemaCache.get(fullPath);
- if (cachedSchema.isEmpty()) {
- isAllCached = false;
- break;
- } else {
- schemaTree.mergeSchemaTree(cachedSchema);
- storageGroupSet.addAll(cachedSchema.getDatabases());
- }
- }
- if (isAllCached) {
- schemaTree.setDatabases(storageGroupSet);
- return schemaTree;
- }
- }
-
- schemaTree =
- executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
-
- // only cache the schema fetched by full path
- List<MeasurementPath> measurementPathList;
- for (PartialPath fullPath : fullPathList) {
- measurementPathList = schemaTree.searchMeasurementPaths(fullPath).left;
- if (measurementPathList.isEmpty()) {
- continue;
- }
- schemaCache.put(
- schemaTree.getBelongedDatabase(measurementPathList.get(0)), measurementPathList.get(0));
- }
- return schemaTree;
- } finally {
- schemaCache.releaseReadLock();
- }
- }
-
- private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
- long queryId = SessionManager.getInstance().requestQueryId();
- try {
- ExecutionResult executionResult =
- coordinator.execute(
- schemaFetchStatement,
- queryId,
- null,
- "",
- ClusterPartitionFetcher.getInstance(),
- this,
- config.getQueryTimeoutThreshold());
- if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException(
- String.format(
- "cannot fetch schema, status is: %s, msg is: %s",
- executionResult.status.getCode(), executionResult.status.getMessage()));
- }
- try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
- ClusterSchemaTree result = new ClusterSchemaTree();
- Set<String> databaseSet = new HashSet<>();
- while (coordinator.getQueryExecution(queryId).hasNextResult()) {
- // The query will be transited to FINISHED when invoking getBatchResult() at the last time
- // So we don't need to clean up it manually
- Optional<TsBlock> tsBlock;
- try {
- tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- } catch (IoTDBException e) {
- throw new RuntimeException("Fetch Schema failed. ", e);
- }
- if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
- break;
- }
- Column column = tsBlock.get().getColumn(0);
- for (int i = 0; i < column.getPositionCount(); i++) {
- parseFetchedData(column.getBinary(i), result, databaseSet);
- }
- }
- result.setDatabases(databaseSet);
- return result;
- }
- } finally {
- coordinator.cleanupQueryExecution(queryId);
- }
- }
-
- private void parseFetchedData(
- Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
- InputStream inputStream = new ByteArrayInputStream(data.getValues());
- try {
- byte type = ReadWriteIOUtils.readByte(inputStream);
- if (type == 0) {
- int size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- databaseSet.add(ReadWriteIOUtils.readString(inputStream));
- }
- } else if (type == 1) {
- resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
- } else {
- throw new RuntimeException(
- new MetadataException("Failed to fetch schema because of unrecognized data"));
- }
- } catch (IOException e) {
- // Totally memory operation. This case won't happen.
- }
- }
-
- @Override
- public ISchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath,
- String[] measurements,
- Function<Integer, TSDataType> getDataType,
- boolean isAligned) {
- // The schema cache R/W and fetch operation must be locked together thus the cache clean
- // operation executed by delete timeseries will be effective.
- schemaCache.takeReadLock();
- try {
- ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
- List<Integer> indexOfMissingMeasurements =
- checkMissingMeasurements(schemaTree, devicePath, measurements);
-
- // all schema can be taken from cache
- if (indexOfMissingMeasurements.isEmpty()) {
- return schemaTree;
- }
-
- // try fetch the missing schema from remote and cache fetched schema
- PathPatternTree patternTree = new PathPatternTree();
- for (int index : indexOfMissingMeasurements) {
- patternTree.appendFullPath(devicePath, measurements[index]);
- }
- ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
- if (!remoteSchemaTree.isEmpty()) {
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
- }
-
- if (!config.isAutoCreateSchemaEnabled()) {
- return schemaTree;
- }
-
- // auto create the still missing schema and merge them into schemaTree
- checkAndAutoCreateMissingMeasurements(
- schemaTree,
- devicePath,
- indexOfMissingMeasurements,
- measurements,
- getDataType,
- null,
- null,
- isAligned);
-
- return schemaTree;
- } finally {
- schemaCache.releaseReadLock();
- }
- }
-
- @Override
- public ISchemaTree fetchSchemaListWithAutoCreate(
- List<PartialPath> devicePathList,
- List<String[]> measurementsList,
- List<TSDataType[]> tsDataTypesList,
- List<Boolean> isAlignedList) {
- return fetchSchemaListWithAutoCreate(
- devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
- }
-
- @Override
- public ISchemaTree fetchSchemaListWithAutoCreate(
- List<PartialPath> devicePathList,
- List<String[]> measurementsList,
- List<TSDataType[]> tsDataTypesList,
- List<TSEncoding[]> encodingsList,
- List<CompressionType[]> compressionTypesList,
- List<Boolean> isAlignedList) {
- // The schema cache R/W and fetch operation must be locked together thus the cache clean
- // operation executed by delete timeseries will be effective.
- schemaCache.takeReadLock();
- try {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- PathPatternTree patternTree = new PathPatternTree();
- List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size());
- for (int i = 0; i < devicePathList.size(); i++) {
- schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
- List<Integer> indexOfMissingMeasurements =
- checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
- indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
- for (int index : indexOfMissingMeasurements) {
- patternTree.appendFullPath(devicePathList.get(i), measurementsList.get(i)[index]);
- }
- }
-
- // all schema can be taken from cache
- if (patternTree.isEmpty()) {
- return schemaTree;
- }
-
- // try fetch the missing schema from remote and cache fetched schema
- ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
- if (!remoteSchemaTree.isEmpty()) {
- schemaTree.mergeSchemaTree(remoteSchemaTree);
- schemaCache.put(remoteSchemaTree);
- }
-
- if (!config.isAutoCreateSchemaEnabled()) {
- return schemaTree;
- }
-
- // auto create the still missing schema and merge them into schemaTree
- for (int i = 0; i < devicePathList.size(); i++) {
- int finalI = i;
- checkAndAutoCreateMissingMeasurements(
- schemaTree,
- devicePathList.get(i),
- indexOfMissingMeasurementsList.get(i),
- measurementsList.get(i),
- index -> tsDataTypesList.get(finalI)[index],
- encodingsList == null ? null : encodingsList.get(i),
- compressionTypesList == null ? null : compressionTypesList.get(i),
- isAlignedList.get(i));
- }
- return schemaTree;
- } finally {
- schemaCache.releaseReadLock();
- }
- }
-
- @Override
- public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
- return templateManager.checkTemplateSetInfo(path);
- }
-
- @Override
- public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
- return templateManager.checkAllRelatedTemplate(pathPattern);
- }
-
- @Override
- public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
- return templateManager.getAllPathsSetTemplate(templateName);
- }
-
- // check which measurements are missing and auto create the missing measurements and merge them
- // into given schemaTree
- private void checkAndAutoCreateMissingMeasurements(
- ClusterSchemaTree schemaTree,
- PartialPath devicePath,
- List<Integer> indexOfMissingMeasurements,
- String[] measurements,
- Function<Integer, TSDataType> getDataType,
- TSEncoding[] encodings,
- CompressionType[] compressionTypes,
- boolean isAligned) {
- // check missing measurements
- DeviceSchemaInfo deviceSchemaInfo =
- schemaTree.searchDeviceSchemaInfo(
- devicePath,
- indexOfMissingMeasurements.stream()
- .map(index -> measurements[index])
- .collect(Collectors.toList()));
- if (deviceSchemaInfo != null) {
- List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
- int removedCount = 0;
- for (int i = 0, size = schemaList.size(); i < size; i++) {
- if (schemaList.get(i) != null) {
- indexOfMissingMeasurements.remove(i - removedCount);
- removedCount++;
- }
- }
- }
- if (indexOfMissingMeasurements.isEmpty()) {
- return;
- }
-
- // check whether there is template should be activated
- Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
- if (templateInfo != null) {
- Template template = templateInfo.left;
- boolean shouldActivateTemplate = false;
- for (int index : indexOfMissingMeasurements) {
- if (template.hasSchema(measurements[index])) {
- shouldActivateTemplate = true;
- break;
- }
- }
-
- if (shouldActivateTemplate) {
- internalActivateTemplate(devicePath);
- List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
- for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
- if (!template.hasSchema(measurements[i])) {
- recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
- }
- }
- indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
- for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(entry.getKey()),
- (MeasurementSchema) entry.getValue(),
- null,
- null,
- template.isDirectAligned());
- }
-
- if (indexOfMissingMeasurements.isEmpty()) {
- return;
- }
- }
- }
-
- // auto create the rest missing timeseries
- List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
- List<TSDataType> dataTypesOfMissingMeasurement =
- new ArrayList<>(indexOfMissingMeasurements.size());
- List<TSEncoding> encodingsOfMissingMeasurement =
- new ArrayList<>(indexOfMissingMeasurements.size());
- List<CompressionType> compressionTypesOfMissingMeasurement =
- new ArrayList<>(indexOfMissingMeasurements.size());
- indexOfMissingMeasurements.forEach(
- index -> {
- TSDataType tsDataType = getDataType.apply(index);
- // tsDataType == null means insert null value to a non-exist series
- // should skip creating them
- if (tsDataType != null) {
- missingMeasurements.add(measurements[index]);
- dataTypesOfMissingMeasurement.add(tsDataType);
- encodingsOfMissingMeasurement.add(
- encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]);
- compressionTypesOfMissingMeasurement.add(
- compressionTypes == null
- ? TSFileDescriptor.getInstance().getConfig().getCompressor()
- : compressionTypes[index]);
- }
- });
-
- if (!missingMeasurements.isEmpty()) {
- schemaTree.mergeSchemaTree(
- internalCreateTimeseries(
- devicePath,
- missingMeasurements,
- dataTypesOfMissingMeasurement,
- encodingsOfMissingMeasurement,
- compressionTypesOfMissingMeasurement,
- isAligned));
- }
- }
-
- private List<Integer> checkMissingMeasurements(
- ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
- DeviceSchemaInfo deviceSchemaInfo =
- schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
- if (deviceSchemaInfo == null) {
- return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList());
- }
-
- List<Integer> indexOfMissingMeasurements = new ArrayList<>();
- List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
- for (int i = 0; i < measurements.length; i++) {
- if (schemaList.get(i) == null) {
- indexOfMissingMeasurements.add(i);
- }
- }
-
- return indexOfMissingMeasurements;
- }
-
- // try to create the target timeseries and return schemaTree involving successfully created
- // timeseries and existing timeseries
- private ClusterSchemaTree internalCreateTimeseries(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> tsDataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- boolean isAligned) {
- List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
- new InternalCreateTimeSeriesStatement(
- devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
-
- Set<Integer> alreadyExistingMeasurementIndexSet =
- measurementPathList.stream()
- .map(o -> measurements.indexOf(o.getMeasurement()))
- .collect(Collectors.toSet());
-
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- schemaTree.appendMeasurementPaths(measurementPathList);
-
- for (int i = 0, size = measurements.size(); i < size; i++) {
- if (alreadyExistingMeasurementIndexSet.contains(i)) {
- continue;
- }
-
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(measurements.get(i)),
- new MeasurementSchema(
- measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
- null,
- null,
- isAligned);
- }
-
- return schemaTree;
- }
-
- // auto create timeseries and return the existing timeseries info
- private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
- InternalCreateTimeSeriesStatement statement) {
-
- ExecutionResult executionResult = executeStatement(statement);
-
- int statusCode = executionResult.status.getCode();
- if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return Collections.emptyList();
- }
-
- if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- throw new RuntimeException(
- new IoTDBException(executionResult.status.getMessage(), statusCode));
- }
-
- Set<String> failedCreationSet = new HashSet<>();
- List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
- for (TSStatus subStatus : executionResult.status.subStatus) {
- if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- alreadyExistingMeasurements.add(
- MeasurementPath.parseDataFromString(subStatus.getMessage()));
- } else {
- failedCreationSet.add(subStatus.message);
- }
- }
-
- if (!failedCreationSet.isEmpty()) {
- throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
- }
-
- return alreadyExistingMeasurements;
- }
-
- public void internalActivateTemplate(PartialPath devicePath) {
- ExecutionResult executionResult = executeStatement(new ActivateTemplateStatement(devicePath));
- TSStatus status = executionResult.status;
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
- throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
- }
- }
-
- private ExecutionResult executeStatement(Statement statement) {
- long queryId = SessionManager.getInstance().requestQueryId();
- return coordinator.execute(
- statement,
- queryId,
- null,
- "",
- ClusterPartitionFetcher.getInstance(),
- this,
- config.getQueryTimeoutThreshold());
- }
-
- @Override
- public void invalidAllCache() {
- DataNodeSchemaCache.getInstance().cleanUp();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index f2ab109b79..d8b3cab4d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceGroupSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..4b4ef0f795
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+class AutoCreateSchemaExecutor {
+
+ private final ITemplateManager templateManager;
+ private final Function<Statement, ExecutionResult> statementExecutor;
+
+ AutoCreateSchemaExecutor(
+ ITemplateManager templateManager, Function<Statement, ExecutionResult> statementExecutor) {
+ this.templateManager = templateManager;
+ this.statementExecutor = statementExecutor;
+ }
+
+ // auto create the missing measurements and merge them into given schemaTree
+ void autoCreateMissingMeasurements(
+ ClusterSchemaTree schemaTree,
+ PartialPath devicePath,
+ List<Integer> indexOfTargetMeasurements,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean isAligned) {
+ // check whether there is template should be activated
+ Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
+ if (templateInfo != null) {
+ Template template = templateInfo.left;
+ List<Integer> indexOfMeasurementsNotInTemplate =
+ checkMeasurementsInSchemaTemplate(
+ devicePath, indexOfTargetMeasurements, measurements, isAligned, template);
+ if (indexOfMeasurementsNotInTemplate.size() < indexOfTargetMeasurements.size()) {
+ // there are measurements in schema template
+ internalActivateTemplate(devicePath);
+ for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(entry.getKey()),
+ (MeasurementSchema) entry.getValue(),
+ null,
+ null,
+ template.isDirectAligned());
+ }
+ }
+ if (indexOfMeasurementsNotInTemplate.isEmpty()) {
+ return;
+ }
+ // there are measurements need to be created as normal timeseries
+ indexOfTargetMeasurements = indexOfMeasurementsNotInTemplate;
+ }
+
+ // auto create the rest missing timeseries
+ List<String> missingMeasurements = new ArrayList<>(indexOfTargetMeasurements.size());
+ List<TSDataType> dataTypesOfMissingMeasurement =
+ new ArrayList<>(indexOfTargetMeasurements.size());
+ List<TSEncoding> encodingsOfMissingMeasurement =
+ new ArrayList<>(indexOfTargetMeasurements.size());
+ List<CompressionType> compressionTypesOfMissingMeasurement =
+ new ArrayList<>(indexOfTargetMeasurements.size());
+ indexOfTargetMeasurements.forEach(
+ index -> {
+ TSDataType tsDataType = getDataType.apply(index);
+ // tsDataType == null means insert null value to a non-exist series
+ // should skip creating them
+ if (tsDataType != null) {
+ missingMeasurements.add(measurements[index]);
+ dataTypesOfMissingMeasurement.add(tsDataType);
+ encodingsOfMissingMeasurement.add(getDefaultEncoding(tsDataType));
+ compressionTypesOfMissingMeasurement.add(
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
+ }
+ });
+
+ if (!missingMeasurements.isEmpty()) {
+ internalCreateTimeSeries(
+ schemaTree,
+ devicePath,
+ missingMeasurements,
+ dataTypesOfMissingMeasurement,
+ encodingsOfMissingMeasurement,
+ compressionTypesOfMissingMeasurement,
+ isAligned);
+ }
+ }
+
+ void autoCreateMissingMeasurements(
+ ClusterSchemaTree schemaTree,
+ List<PartialPath> devicePathList,
+ List<Integer> indexOfTargetDevices,
+ List<List<Integer>> indexOfTargetMeasurementsList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<TSEncoding[]> encodingsList,
+ List<CompressionType[]> compressionTypesList,
+ List<Boolean> isAlignedList) {
+ // check whether there is template should be activated
+
+ Map<PartialPath, Pair<Template, PartialPath>> devicesNeedActivateTemplate = new HashMap<>();
+ Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries =
+ new HashMap<>();
+ int deviceIndex;
+ PartialPath devicePath;
+ List<Integer> indexOfTargetMeasurements;
+ Pair<Template, PartialPath> templateInfo;
+ Template template;
+ List<Integer> indexOfMeasurementsNotInTemplate;
+ for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
+ deviceIndex = indexOfTargetDevices.get(i);
+ devicePath = devicePathList.get(deviceIndex);
+ indexOfTargetMeasurements = indexOfTargetMeasurementsList.get(i);
+
+ templateInfo = devicesNeedActivateTemplate.get(devicePath);
+ if (templateInfo == null) {
+ templateInfo = templateManager.checkTemplateSetInfo(devicePath);
+ }
+
+ if (templateInfo == null) {
+ indexOfMeasurementsNotInTemplate = indexOfTargetMeasurements;
+ } else {
+ template = templateInfo.left;
+ indexOfMeasurementsNotInTemplate =
+ checkMeasurementsInSchemaTemplate(
+ devicePath,
+ indexOfTargetMeasurements,
+ measurementsList.get(deviceIndex),
+ isAlignedList.get(deviceIndex),
+ template);
+ if (indexOfMeasurementsNotInTemplate.size() < indexOfTargetMeasurements.size()) {
+ // there are measurements in schema template
+ devicesNeedActivateTemplate.putIfAbsent(devicePath, templateInfo);
+ }
+ }
+
+ if (!indexOfMeasurementsNotInTemplate.isEmpty()) {
+ // there are measurements need to be created as normal timeseries
+ int finalDeviceIndex = deviceIndex;
+ List<Integer> finalIndexOfMeasurementsNotInTemplate = indexOfMeasurementsNotInTemplate;
+ devicesNeedAutoCreateTimeSeries.compute(
+ devicePath,
+ (k, v) -> {
+ if (v == null) {
+ v = new Pair<>(isAlignedList.get(finalDeviceIndex), new MeasurementGroup());
+ }
+ MeasurementGroup measurementGroup = v.right;
+ String[] measurements = measurementsList.get(finalDeviceIndex);
+ TSDataType[] tsDataTypes = tsDataTypesList.get(finalDeviceIndex);
+ TSEncoding[] encodings =
+ encodingsList == null ? null : encodingsList.get(finalDeviceIndex);
+ CompressionType[] compressionTypes =
+ compressionTypesList == null ? null : compressionTypesList.get(finalDeviceIndex);
+ for (int measurementIndex : finalIndexOfMeasurementsNotInTemplate) {
+ if (tsDataTypes[measurementIndex] == null) {
+ continue;
+ }
+ measurementGroup.addMeasurement(
+ measurements[measurementIndex],
+ tsDataTypes[measurementIndex],
+ encodings == null
+ ? getDefaultEncoding(tsDataTypes[measurementIndex])
+ : encodings[measurementIndex],
+ compressionTypes == null
+ ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+ : compressionTypes[measurementIndex]);
+ }
+ return v;
+ });
+ }
+ }
+
+ if (!devicesNeedActivateTemplate.isEmpty()) {
+ internalActivateTemplate(devicesNeedActivateTemplate);
+ for (Map.Entry<PartialPath, Pair<Template, PartialPath>> entry :
+ devicesNeedActivateTemplate.entrySet()) {
+ devicePath = entry.getKey();
+ template = entry.getValue().left;
+ for (Map.Entry<String, IMeasurementSchema> measurementEntry :
+ template.getSchemaMap().entrySet()) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(measurementEntry.getKey()),
+ (MeasurementSchema) measurementEntry.getValue(),
+ null,
+ null,
+ template.isDirectAligned());
+ }
+ }
+ }
+
+ if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
+ internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries);
+ }
+ }
+
+ private List<Integer> checkMeasurementsInSchemaTemplate(
+ PartialPath devicePath,
+ List<Integer> indexOfTargetMeasurements,
+ String[] measurements,
+ boolean isAligned,
+ Template template) {
+ // check whether there is template should be activated
+ boolean shouldActivateTemplate = false;
+ for (int index : indexOfTargetMeasurements) {
+ if (template.hasSchema(measurements[index])) {
+ shouldActivateTemplate = true;
+ break;
+ }
+ }
+ if (shouldActivateTemplate) {
+ List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+ for (int index : indexOfTargetMeasurements) {
+ if (!template.hasSchema(measurements[index])) {
+ recheckedIndexOfMissingMeasurements.add(index);
+ }
+ }
+ return recheckedIndexOfMissingMeasurements;
+ } else {
+ return indexOfTargetMeasurements;
+ }
+ }
+
+ // try to create the target timeseries and merge schema of successfully created
+ // timeseries and existing timeseries into given schemaTree
+ private void internalCreateTimeSeries(
+ ClusterSchemaTree schemaTree,
+ PartialPath devicePath,
+ List<String> measurements,
+ List<TSDataType> tsDataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
+ boolean isAligned) {
+ List<MeasurementPath> measurementPathList =
+ executeInternalCreateTimeseriesStatement(
+ new InternalCreateTimeSeriesStatement(
+ devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
+
+ Set<Integer> alreadyExistingMeasurementIndexSet =
+ measurementPathList.stream()
+ .map(o -> measurements.indexOf(o.getMeasurement()))
+ .collect(Collectors.toSet());
+
+ schemaTree.appendMeasurementPaths(measurementPathList);
+
+ for (int i = 0, size = measurements.size(); i < size; i++) {
+ if (alreadyExistingMeasurementIndexSet.contains(i)) {
+ continue;
+ }
+
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(measurements.get(i)),
+ new MeasurementSchema(
+ measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
+ null,
+ null,
+ isAligned);
+ }
+ }
+
+ // auto create timeseries and return the existing timeseries info
+ private List<MeasurementPath> executeInternalCreateTimeseriesStatement(Statement statement) {
+
+ ExecutionResult executionResult = statementExecutor.apply(statement);
+
+ int statusCode = executionResult.status.getCode();
+ if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return Collections.emptyList();
+ }
+
+ if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ throw new RuntimeException(
+ new IoTDBException(executionResult.status.getMessage(), statusCode));
+ }
+
+ Set<String> failedCreationSet = new HashSet<>();
+ List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+ for (TSStatus subStatus : executionResult.status.subStatus) {
+ if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ alreadyExistingMeasurements.add(
+ MeasurementPath.parseDataFromString(subStatus.getMessage()));
+ } else {
+ failedCreationSet.add(subStatus.message);
+ }
+ }
+
+ if (!failedCreationSet.isEmpty()) {
+ throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
+ }
+
+ return alreadyExistingMeasurements;
+ }
+
+ private void internalActivateTemplate(PartialPath devicePath) {
+ ExecutionResult executionResult =
+ statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+ TSStatus status = executionResult.status;
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ throw new SemanticException(new IoTDBException(status.getMessage(), status.getCode()));
+ }
+ }
+
+ private void internalActivateTemplate(
+ Map<PartialPath, Pair<Template, PartialPath>> devicesNeedActivateTemplate) {
+ ExecutionResult executionResult =
+ statementExecutor.apply(
+ new InternalBatchActivateTemplateStatement(devicesNeedActivateTemplate));
+ TSStatus status = executionResult.status;
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ return;
+ }
+ if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ Set<String> failedActivationSet = new HashSet<>();
+ for (TSStatus subStatus : status.subStatus) {
+ if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && subStatus.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ failedActivationSet.add(subStatus.message);
+ }
+ }
+ if (!failedActivationSet.isEmpty()) {
+ throw new SemanticException(new MetadataException(String.join("; ", failedActivationSet)));
+ }
+ } else {
+ throw new SemanticException(new IoTDBException(status.getMessage(), status.getCode()));
+ }
+ }
+
+ private void internalCreateTimeSeries(
+ ClusterSchemaTree schemaTree,
+ Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries) {
+
+ List<MeasurementPath> measurementPathList =
+ executeInternalCreateTimeseriesStatement(
+ new InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries));
+
+ schemaTree.appendMeasurementPaths(measurementPathList);
+
+ Map<PartialPath, Set<String>> alreadyExistingMeasurementMap = new HashMap<>();
+ for (MeasurementPath measurementPath : measurementPathList) {
+ alreadyExistingMeasurementMap
+ .computeIfAbsent(measurementPath.getDevicePath(), k -> new HashSet<>())
+ .add(measurementPath.getMeasurement());
+ }
+ Set<String> measurementSet;
+ MeasurementGroup measurementGroup;
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry :
+ devicesNeedAutoCreateTimeSeries.entrySet()) {
+ measurementSet = alreadyExistingMeasurementMap.get(entry.getKey());
+ measurementGroup = entry.getValue().right;
+ for (int i = 0, size = measurementGroup.size(); i < size; i++) {
+ if (measurementSet != null
+ && measurementSet.contains(measurementGroup.getMeasurements().get(i))) {
+ continue;
+ }
+ schemaTree.appendSingleMeasurement(
+ entry.getKey().concatNode(measurementGroup.getMeasurements().get(i)),
+ new MeasurementSchema(
+ measurementGroup.getMeasurements().get(i),
+ measurementGroup.getDataTypes().get(i),
+ measurementGroup.getEncodings().get(i),
+ measurementGroup.getCompressors().get(i)),
+ null,
+ null,
+ entry.getValue().left);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..9e8576a18c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+ private final Coordinator coordinator;
+ private final ITemplateManager templateManager;
+ private final Supplier<Long> queryIdProvider;
+ private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+ private final Consumer<ClusterSchemaTree> schemaCacheUpdater;
+
+ ClusterSchemaFetchExecutor(
+ Coordinator coordinator,
+ ITemplateManager templateManager,
+ Supplier<Long> queryIdProvider,
+ BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+ Consumer<ClusterSchemaTree> schemaCacheUpdater) {
+ this.coordinator = coordinator;
+ this.templateManager = templateManager;
+ this.queryIdProvider = queryIdProvider;
+ this.statementExecutor = statementExecutor;
+ this.schemaCacheUpdater = schemaCacheUpdater;
+ }
+
+ /**
+ * This method is used for scenarios that patternTree may have wildcard or there's no need to
+ * cache the result.
+ */
+ ClusterSchemaTree fetchSchemaOfFuzzyMatch(PathPatternTree patternTree, boolean withTags) {
+ Map<Integer, Template> templateMap = new HashMap<>();
+ List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+ for (PartialPath pattern : pathPatternList) {
+ templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+ }
+ return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+ }
+
+ /**
+ * This method is used for scenarios that patternTree has no wildcard and the result should be
+ * cached.
+ *
+ * @param fullPathList all the fullPath without wildcard split from rawPatternTree
+ * @param rawPatternTree the pattern tree consisting of the fullPathList
+ * @return fetched schema
+ */
+ ClusterSchemaTree fetchSchemaOfPreciseMatch(
+ List<PartialPath> fullPathList, PathPatternTree rawPatternTree) {
+ ClusterSchemaTree schemaTree =
+ executeSchemaFetchQuery(
+ new SchemaFetchStatement(rawPatternTree, analyzeTemplate(fullPathList), false));
+ if (!schemaTree.isEmpty()) {
+ schemaCacheUpdater.accept(schemaTree);
+ }
+ return schemaTree;
+ }
+
+ ClusterSchemaTree fetchSchemaOfOneDevice(
+ PartialPath devicePath, String[] measurements, List<Integer> indexOfTargetMeasurements) {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (int index : indexOfTargetMeasurements) {
+ patternTree.appendFullPath(devicePath, measurements[index]);
+ }
+ patternTree.constructTree();
+ return fetchSchemaAndCacheResult(patternTree);
+ }
+
+ ClusterSchemaTree fetchSchemaOfMultiDevices(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<Integer> indexOfTargetDevices,
+ List<List<Integer>> indexOfTargetMeasurementsList) {
+ PathPatternTree patternTree = new PathPatternTree();
+ int deviceIndex;
+ for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
+ deviceIndex = indexOfTargetDevices.get(i);
+ for (int measurementIndex : indexOfTargetMeasurementsList.get(i)) {
+ patternTree.appendFullPath(
+ devicePathList.get(deviceIndex), measurementsList.get(deviceIndex)[measurementIndex]);
+ }
+ }
+ patternTree.constructTree();
+ return fetchSchemaAndCacheResult(patternTree);
+ }
+
+ private ClusterSchemaTree fetchSchemaAndCacheResult(PathPatternTree patternTree) {
+ ClusterSchemaTree schemaTree =
+ executeSchemaFetchQuery(
+ new SchemaFetchStatement(
+ patternTree, analyzeTemplate(patternTree.getAllPathPatterns()), false));
+ if (!schemaTree.isEmpty()) {
+ schemaCacheUpdater.accept(schemaTree);
+ }
+ return schemaTree;
+ }
+
+ private Map<Integer, Template> analyzeTemplate(List<PartialPath> pathPatternList) {
+ Map<Integer, Template> templateMap = new HashMap<>();
+ for (PartialPath pattern : pathPatternList) {
+ templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+ }
+ return templateMap;
+ }
+
+ private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+ long queryId = queryIdProvider.get();
+ try {
+ ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
+ if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException(
+ String.format(
+ "cannot fetch schema, status is: %s, msg is: %s",
+ executionResult.status.getCode(), executionResult.status.getMessage()));
+ }
+ try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
+ ClusterSchemaTree result = new ClusterSchemaTree();
+ Set<String> databaseSet = new HashSet<>();
+ while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+ // The query will be transited to FINISHED when invoking getBatchResult() at the last time
+ // So we don't need to clean up it manually
+ Optional<TsBlock> tsBlock;
+ try {
+ tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+ } catch (IoTDBException e) {
+ throw new RuntimeException("Fetch Schema failed. ", e);
+ }
+ if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+ break;
+ }
+ Column column = tsBlock.get().getColumn(0);
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ parseFetchedData(column.getBinary(i), result, databaseSet);
+ }
+ }
+ result.setDatabases(databaseSet);
+ return result;
+ }
+ } finally {
+ coordinator.cleanupQueryExecution(queryId);
+ }
+ }
+
+ private void parseFetchedData(
+ Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
+ InputStream inputStream = new ByteArrayInputStream(data.getValues());
+ try {
+ byte type = ReadWriteIOUtils.readByte(inputStream);
+ if (type == 0) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+ }
+ } else if (type == 1) {
+ resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+ } else {
+ throw new RuntimeException(
+ new MetadataException("Failed to fetch schema because of unrecognized data"));
+ }
+ } catch (IOException e) {
+ // Totally memory operation. This case won't happen.
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
new file mode 100644
index 0000000000..09868e6b9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ClusterSchemaFetcher implements ISchemaFetcher {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private final Coordinator coordinator = Coordinator.getInstance();
+ private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
+ private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
+
+ private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+ new AutoCreateSchemaExecutor(
+ templateManager,
+ statement -> {
+ long queryId = SessionManager.getInstance().requestQueryId();
+ return coordinator.execute(
+ statement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold());
+ });
+ private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+ new ClusterSchemaFetchExecutor(
+ coordinator,
+ templateManager,
+ () -> SessionManager.getInstance().requestQueryId(),
+ (queryId, statement) ->
+ coordinator.execute(
+ statement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold()),
+ schemaCache::put);
+
+ private static final class ClusterSchemaFetcherHolder {
+ private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
+
+ private ClusterSchemaFetcherHolder() {}
+ }
+
+ public static ClusterSchemaFetcher getInstance() {
+ return ClusterSchemaFetcherHolder.INSTANCE;
+ }
+
+ private ClusterSchemaFetcher() {}
+
+ @Override
+ public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
+ patternTree.constructTree();
+ List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+ List<PartialPath> fullPathList = new ArrayList<>();
+ for (PartialPath pattern : pathPatternList) {
+ if (!pattern.hasWildcard()) {
+ fullPathList.add(pattern);
+ }
+ }
+
+ if (fullPathList.size() < pathPatternList.size()) {
+ return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, false);
+ }
+
+ // The schema cache R/W and fetch operation must be locked together thus the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree;
+ if (fullPathList.size() == pathPatternList.size()) {
+ boolean isAllCached = true;
+ schemaTree = new ClusterSchemaTree();
+ ClusterSchemaTree cachedSchema;
+ Set<String> storageGroupSet = new HashSet<>();
+ for (PartialPath fullPath : fullPathList) {
+ cachedSchema = schemaCache.get(fullPath);
+ if (cachedSchema.isEmpty()) {
+ isAllCached = false;
+ break;
+ } else {
+ schemaTree.mergeSchemaTree(cachedSchema);
+ storageGroupSet.addAll(cachedSchema.getDatabases());
+ }
+ }
+ if (isAllCached) {
+ schemaTree.setDatabases(storageGroupSet);
+ return schemaTree;
+ }
+ }
+
+ return clusterSchemaFetchExecutor.fetchSchemaOfPreciseMatch(fullPathList, patternTree);
+ } finally {
+ schemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+ patternTree.constructTree();
+ return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, true);
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean isAligned) {
+ // The schema cache R/W and fetch operation must be locked together thus the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
+ List<Integer> indexOfMissingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePath, measurements);
+
+ // all schema can be taken from cache
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return schemaTree;
+ }
+
+ // try fetch the missing schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+ devicePath, measurements, indexOfMissingMeasurements);
+ if (!remoteSchemaTree.isEmpty()) {
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ }
+
+ if (!config.isAutoCreateSchemaEnabled()) {
+ return schemaTree;
+ }
+
+ // auto create the still missing schema and merge them into schemaTree
+ indexOfMissingMeasurements =
+ checkMissingMeasurementsAfterSchemaFetch(
+ schemaTree, devicePath, indexOfMissingMeasurements, measurements);
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ autoCreateSchemaExecutor.autoCreateMissingMeasurements(
+ schemaTree,
+ devicePath,
+ indexOfMissingMeasurements,
+ measurements,
+ getDataType,
+ isAligned);
+ }
+
+ return schemaTree;
+ } finally {
+ schemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<Boolean> isAlignedList) {
+ return fetchSchemaListWithAutoCreate(
+ devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList);
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<TSEncoding[]> encodingsList,
+ List<CompressionType[]> compressionTypesList,
+ List<Boolean> isAlignedList) {
+ // The schema cache R/W and fetch operation must be locked together thus the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ List<List<Integer>> indexOfMissingMeasurementsList = new ArrayList<>(devicePathList.size());
+ List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
+ for (int i = 0; i < devicePathList.size(); i++) {
+ schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
+ List<Integer> indexOfMissingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesWithMissingMeasurements.add(i);
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema can be taken from cache
+ if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+ return schemaTree;
+ }
+
+ // try fetch the missing schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+ devicePathList,
+ measurementsList,
+ indexOfDevicesWithMissingMeasurements,
+ indexOfMissingMeasurementsList);
+ if (!remoteSchemaTree.isEmpty()) {
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ }
+
+ if (!config.isAutoCreateSchemaEnabled()) {
+ return schemaTree;
+ }
+
+ // auto create the still missing schema and merge them into schemaTree
+ List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
+ List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
+ List<Integer> indexOfMissingMeasurements;
+ int deviceIndex;
+ for (int i = 0, size = indexOfDevicesWithMissingMeasurements.size(); i < size; i++) {
+ deviceIndex = indexOfDevicesWithMissingMeasurements.get(i);
+ indexOfMissingMeasurements = indexOfMissingMeasurementsList.get(i);
+ indexOfMissingMeasurements =
+ checkMissingMeasurementsAfterSchemaFetch(
+ schemaTree,
+ devicePathList.get(deviceIndex),
+ indexOfMissingMeasurements,
+ measurementsList.get(deviceIndex));
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesNeedAutoCreateSchema.add(deviceIndex);
+ indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
+ }
+ }
+
+ if (!indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
+ autoCreateSchemaExecutor.autoCreateMissingMeasurements(
+ schemaTree,
+ devicePathList,
+ indexOfDevicesNeedAutoCreateSchema,
+ indexOfMeasurementsNeedAutoCreate,
+ measurementsList,
+ tsDataTypesList,
+ encodingsList,
+ compressionTypesList,
+ isAlignedList);
+ }
+
+ return schemaTree;
+ } finally {
+ schemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+ return templateManager.checkTemplateSetInfo(path);
+ }
+
+ @Override
+ public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
+ return templateManager.checkAllRelatedTemplate(pathPattern);
+ }
+
+ @Override
+ public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
+ return templateManager.getAllPathsSetTemplate(templateName);
+ }
+
+ private List<Integer> checkMissingMeasurements(
+ ISchemaTree schemaTree, PartialPath devicePath, String[] measurements) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+ if (deviceSchemaInfo == null) {
+ return IntStream.range(0, measurements.length).boxed().collect(Collectors.toList());
+ }
+
+ List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+ List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+ for (int i = 0; i < measurements.length; i++) {
+ if (schemaList.get(i) == null) {
+ indexOfMissingMeasurements.add(i);
+ }
+ }
+
+ return indexOfMissingMeasurements;
+ }
+
+ private List<Integer> checkMissingMeasurementsAfterSchemaFetch(
+ ClusterSchemaTree schemaTree,
+ PartialPath devicePath,
+ List<Integer> indexOfTargetMeasurements,
+ String[] measurements) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(
+ devicePath,
+ indexOfTargetMeasurements.stream()
+ .map(index -> measurements[index])
+ .collect(Collectors.toList()));
+ if (deviceSchemaInfo == null) {
+ return indexOfTargetMeasurements;
+ }
+
+ List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+ List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+ for (int i = 0, size = schemaList.size(); i < size; i++) {
+ if (schemaList.get(i) == null) {
+ indexOfMissingMeasurements.add(indexOfTargetMeasurements.get(i));
+ }
+ }
+
+ return indexOfMissingMeasurements;
+ }
+
+ @Override
+ public void invalidAllCache() {
+ DataNodeSchemaCache.getInstance().cleanUp();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 5ec75fe1da..37516321c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 391aed20dc..31b7472b44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
index 7e88374da7..58d3a5369c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
@@ -150,4 +150,7 @@ public enum StatementType {
SHOW_TRIGGERS,
DEACTIVATE_TEMPLATE,
+
+ INTERNAL_BATCH_ACTIVATE_TEMPLATE,
+ INTERNAL_CREATE_MULTI_TIMESERIES
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index e7107c06df..6a399b6c70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index cbbc9d4131..bfa4e3d0ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.mpp.plan.planner;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
@@ -29,6 +31,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSe
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
@@ -49,6 +53,8 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -65,8 +71,10 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -339,6 +347,15 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
internalCreateTimeSeriesStatement.isAligned());
}
+ @Override
+ public PlanNode visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
+ MPPQueryContext context) {
+ return new InternalCreateMultiTimeSeriesNode(
+ context.getQueryId().genPlanNodeId(),
+ internalCreateMultiTimeSeriesStatement.getDeviceMap());
+ }
+
@Override
public PlanNode visitCreateMultiTimeseries(
CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) {
@@ -689,6 +706,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
analysis.getTemplateSetInfo().left.getId());
}
+ @Override
+ public PlanNode visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateStatement internalBatchActivateTemplateStatement,
+ MPPQueryContext context) {
+ Map<PartialPath, Pair<Integer, Integer>> templateActivationMap = new HashMap<>();
+ for (Map.Entry<PartialPath, Pair<Template, PartialPath>> entry :
+ internalBatchActivateTemplateStatement.getDeviceMap().entrySet()) {
+ templateActivationMap.put(
+ entry.getKey(),
+ new Pair<>(entry.getValue().left.getId(), entry.getValue().right.getNodeLength() - 1));
+ }
+ return new InternalBatchActivateTemplateNode(
+ context.getQueryId().genPlanNodeId(), templateActivationMap);
+ }
+
@Override
public PlanNode visitShowPathsUsingTemplate(
ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 0f846ad25b..5529e3cdf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMulti
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InvalidateSchemaCacheNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
@@ -152,7 +154,9 @@ public enum PlanNodeType {
DEACTIVATE_TEMPLATE_NODE((short) 61),
INTO((short) 62),
DEVICE_VIEW_INTO((short) 63),
- VERTICALLY_CONCAT((short) 64);
+ VERTICALLY_CONCAT((short) 64),
+ INTERNAL_BATCH_ACTIVATE_TEMPLATE((short) 68),
+ INTERNAL_CREATE_MULTI_TIMESERIES((short) 69);
public static final int BYTES = Short.BYTES;
@@ -331,6 +335,10 @@ public enum PlanNodeType {
return DeviceViewIntoNode.deserialize(buffer);
case 64:
return VerticallyConcatNode.deserialize(buffer);
+ case 68:
+ return InternalBatchActivateTemplateNode.deserialize(buffer);
+ case 69:
+ return InternalCreateMultiTimeSeriesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8bf41c86ec..86f8cbf946 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -41,6 +41,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMulti
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode;
@@ -331,4 +333,12 @@ public abstract class PlanVisitor<R, C> {
public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitInternalBatchActivateTemplate(InternalBatchActivateTemplateNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitInternalCreateMultiTimeSeries(InternalCreateMultiTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
new file mode 100644
index 0000000000..d705ba08fa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalBatchActivateTemplateNode extends WritePlanNode {
+
+ // devicePath -> <templateId, templateSetLevel>
+ private Map<PartialPath, Pair<Integer, Integer>> templateActivationMap;
+
+ private TRegionReplicaSet regionReplicaSet;
+
+ public InternalBatchActivateTemplateNode(
+ PlanNodeId id, Map<PartialPath, Pair<Integer, Integer>> templateActivationMap) {
+ super(id);
+ this.templateActivationMap = templateActivationMap;
+ }
+
+ private InternalBatchActivateTemplateNode(
+ PlanNodeId id,
+ Map<PartialPath, Pair<Integer, Integer>> templateActivationMap,
+ TRegionReplicaSet regionReplicaSet) {
+ super(id);
+ this.templateActivationMap = templateActivationMap;
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
+ public Map<PartialPath, Pair<Integer, Integer>> getTemplateActivationMap() {
+ return templateActivationMap;
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ return new InternalBatchActivateTemplateNode(
+ getPlanNodeId(), templateActivationMap, regionReplicaSet);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return 0;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INTERNAL_BATCH_ACTIVATE_TEMPLATE.serialize(byteBuffer);
+
+ int size = templateActivationMap.size();
+ ReadWriteIOUtils.write(size, byteBuffer);
+ for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : templateActivationMap.entrySet()) {
+ entry.getKey().serialize(byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue().left, byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue().right, byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.INTERNAL_BATCH_ACTIVATE_TEMPLATE.serialize(stream);
+
+ int size = templateActivationMap.size();
+ ReadWriteIOUtils.write(size, stream);
+ for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : templateActivationMap.entrySet()) {
+ entry.getKey().serialize(stream);
+ ReadWriteIOUtils.write(entry.getValue().left, stream);
+ ReadWriteIOUtils.write(entry.getValue().right, stream);
+ }
+ }
+
+ public static InternalBatchActivateTemplateNode deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<PartialPath, Pair<Integer, Integer>> templateActivationMap = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ templateActivationMap.put(
+ (PartialPath) PathDeserializeUtil.deserialize(byteBuffer),
+ new Pair<>(ReadWriteIOUtils.readInt(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer)));
+ }
+
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new InternalBatchActivateTemplateNode(planNodeId, templateActivationMap);
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ // gather devices to same target region
+ Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap = new HashMap<>();
+ for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : templateActivationMap.entrySet()) {
+ TRegionReplicaSet regionReplicaSet =
+ analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getFullPath());
+ splitMap
+ .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+
+ List<WritePlanNode> result = new ArrayList<>();
+ for (Map.Entry<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> entry :
+ splitMap.entrySet()) {
+ result.add(
+ new InternalBatchActivateTemplateNode(getPlanNodeId(), entry.getValue(), entry.getKey()));
+ }
+
+ return result;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitInternalBatchActivateTemplate(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
new file mode 100644
index 0000000000..40cd224e2f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalCreateMultiTimeSeriesNode extends WritePlanNode {
+
+ private Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap;
+
+ private TRegionReplicaSet regionReplicaSet;
+
+ public InternalCreateMultiTimeSeriesNode(
+ PlanNodeId id, Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap) {
+ super(id);
+ this.deviceMap = deviceMap;
+ }
+
+ private InternalCreateMultiTimeSeriesNode(
+ PlanNodeId id,
+ Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap,
+ TRegionReplicaSet regionReplicaSet) {
+ super(id);
+ this.deviceMap = deviceMap;
+ this.regionReplicaSet = regionReplicaSet;
+ }
+
+ public Map<PartialPath, Pair<Boolean, MeasurementGroup>> getDeviceMap() {
+ return deviceMap;
+ }
+
+ @Override
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {}
+
+ @Override
+ public PlanNode clone() {
+ return new InternalCreateMultiTimeSeriesNode(getPlanNodeId(), deviceMap, regionReplicaSet);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return 0;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.INTERNAL_CREATE_MULTI_TIMESERIES.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(deviceMap.size(), byteBuffer);
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry : deviceMap.entrySet()) {
+ entry.getKey().serialize(byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue().left, byteBuffer);
+ entry.getValue().right.serialize(byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.INTERNAL_CREATE_MULTI_TIMESERIES.serialize(stream);
+
+ ReadWriteIOUtils.write(deviceMap.size(), stream);
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry : deviceMap.entrySet()) {
+ entry.getKey().serialize(stream);
+ ReadWriteIOUtils.write(entry.getValue().left, stream);
+ entry.getValue().right.serialize(stream);
+ }
+ }
+
+ public static InternalCreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap = new HashMap<>(size);
+ PartialPath devicePath;
+ boolean isAligned;
+ MeasurementGroup measurementGroup;
+ for (int i = 0; i < size; i++) {
+ devicePath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+ isAligned = ReadWriteIOUtils.readBool(byteBuffer);
+ measurementGroup = new MeasurementGroup();
+ measurementGroup.deserialize(byteBuffer);
+ deviceMap.put(devicePath, new Pair<>(isAligned, measurementGroup));
+ }
+
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new InternalCreateMultiTimeSeriesNode(planNodeId, deviceMap);
+ }
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ // gather devices to same target region
+ Map<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>> splitMap =
+ new HashMap<>();
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> entry : deviceMap.entrySet()) {
+ TRegionReplicaSet regionReplicaSet =
+ analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(entry.getKey().getFullPath());
+ splitMap
+ .computeIfAbsent(regionReplicaSet, k -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+
+ List<WritePlanNode> result = new ArrayList<>();
+ for (Map.Entry<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>> entry :
+ splitMap.entrySet()) {
+ result.add(
+ new InternalCreateMultiTimeSeriesNode(getPlanNodeId(), entry.getValue(), entry.getKey()));
+ }
+
+ return result;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitInternalCreateMultiTimeSeries(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index ebceb0e10b..cf7dc610d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 4d55216dfe..0d433a5901 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -450,4 +452,14 @@ public abstract class StatementVisitor<R, C> {
DropSchemaTemplateStatement dropSchemaTemplateStatement, C context) {
return visitStatement(dropSchemaTemplateStatement, context);
}
+
+ public R visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateStatement internalBatchActivateTemplateStatement, C context) {
+ return visitStatement(internalBatchActivateTemplateStatement, context);
+ }
+
+ public R visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, C context) {
+ return visitStatement(internalCreateMultiTimeSeriesStatement, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalBatchActivateTemplateStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalBatchActivateTemplateStatement.java
new file mode 100644
index 0000000000..8e8008c68d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalBatchActivateTemplateStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.internal;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+// This is only used for auto activate template on multi devices while inserting data
+public class InternalBatchActivateTemplateStatement extends Statement {
+
+ // devicePath -> <Template, TemplateSetPath>
+ private Map<PartialPath, Pair<Template, PartialPath>> deviceMap;
+
+ public InternalBatchActivateTemplateStatement(
+ Map<PartialPath, Pair<Template, PartialPath>> deviceMap) {
+ super();
+ setType(StatementType.INTERNAL_BATCH_ACTIVATE_TEMPLATE);
+ this.deviceMap = deviceMap;
+ }
+
+ public Map<PartialPath, Pair<Template, PartialPath>> getDeviceMap() {
+ return deviceMap;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return new ArrayList<>(deviceMap.keySet());
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInternalBatchActivateTemplate(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalCreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalCreateMultiTimeSeriesStatement.java
new file mode 100644
index 0000000000..828b431cfe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/InternalCreateMultiTimeSeriesStatement.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.internal;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class InternalCreateMultiTimeSeriesStatement extends Statement {
+
+ private Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap;
+
+ public InternalCreateMultiTimeSeriesStatement(
+ Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap) {
+ super();
+ statementType = StatementType.INTERNAL_CREATE_MULTI_TIMESERIES;
+ this.deviceMap = deviceMap;
+ }
+
+ public Map<PartialPath, Pair<Boolean, MeasurementGroup>> getDeviceMap() {
+ return deviceMap;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return new ArrayList<>(deviceMap.keySet());
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitInternalCreateMultiTimeSeries(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index cecee37dfb..6a30d70799 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
index 4641456bdc..d927d18042 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
index 4892e13da7..1c29a6ee52 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index ce21ce89fd..648c3f5903 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -27,11 +27,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index daf3de06d5..a52a9e4983 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -36,11 +36,11 @@ import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 80bf535a27..bf0a312c8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -77,11 +77,11 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 3706fbfd99..dad1f162a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
index 754893d562..1d6509191b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
/**
* This interface is used to load files, including tsFile, syncTask, schema, modsFile and
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index 2f05dddf14..8ae5dba86d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index f468e09d9b..89430bc91f 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index abaff37ba7..ffb3902c9c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 61951feee4..a65b83d03a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index a4a3c6c580..7c908b2f4d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -46,7 +46,7 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;