You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/21 10:59:55 UTC
[iotdb] branch master updated: [IOTDB-3565] Support Dynamic Schema Template (#9663)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 06ee7cca94 [IOTDB-3565] Support Dynamic Schema Template (#9663)
06ee7cca94 is described below
commit 06ee7cca9403cf977f2cb2efba9095ca39019c38
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Fri Apr 21 18:59:49 2023 +0800
[IOTDB-3565] Support Dynamic Schema Template (#9663)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 5 +
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../write/template/ExtendSchemaTemplatePlan.java | 58 +++++
.../confignode/manager/ClusterSchemaManager.java | 77 ++++++
.../iotdb/confignode/manager/ConfigManager.java | 20 ++
.../apache/iotdb/confignode/manager/IManager.java | 3 +
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../persistence/schema/ClusterSchemaInfo.java | 12 +
.../persistence/schema/TemplateTable.java | 47 ++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +
.../iotdb/db/it/schema/IoTDBExtendTemplateIT.java | 177 ++++++++++++++
.../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java | 4 +-
.../session/it/IoTDBSessionSchemaTemplateIT.java | 62 +++++
.../apache/iotdb/db/client/ConfigNodeClient.java | 22 ++
.../iotdb/db/engine/storagegroup/DataRegion.java | 3 -
.../cache/DataNodeTemplateSchemaCache.java | 118 +++++++++-
.../metadata/template/ClusterTemplateManager.java | 11 +
.../iotdb/db/metadata/template/Template.java | 16 +-
...teType.java => TemplateAlterOperationType.java} | 21 +-
.../template/TemplateInternalRPCUpdateType.java | 5 +-
.../metadata/template/TemplateInternalRPCUtil.java | 10 +
.../metadata/template/alter/TemplateAlterInfo.java | 43 ++++
.../template/alter/TemplateAlterOperationUtil.java | 51 ++++
.../template/alter/TemplateExtendInfo.java | 161 +++++++++++++
.../analyze/schema/AutoCreateSchemaExecutor.java | 254 ++++++++++++++++----
.../plan/analyze/schema/ClusterSchemaFetcher.java | 262 ++++-----------------
.../plan/analyze/schema/NormalSchemaFetcher.java | 202 ++++++++++++++++
.../plan/analyze/schema/TemplateSchemaFetcher.java | 246 +++++++++++++++++++
.../plan/execution/config/ConfigTaskVisitor.java | 8 +
.../config/executor/ClusterConfigTaskExecutor.java | 46 ++++
.../config/executor/IConfigTaskExecutor.java | 4 +
.../metadata/template/AlterSchemaTemplateTask.java | 45 ++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 27 +++
.../iotdb/db/mpp/plan/statement/StatementType.java | 1 +
.../db/mpp/plan/statement/StatementVisitor.java | 6 +
.../template/AlterSchemaTemplateStatement.java | 85 +++++++
.../impl/DataNodeInternalRPCServiceImpl.java | 16 +-
.../src/main/thrift/confignode.thrift | 7 +
39 files changed, 1846 insertions(+), 303 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index b4c45548ad..c145d4fe21 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -45,6 +45,7 @@ ddlStatement
| createSchemaTemplate | createTimeseriesUsingSchemaTemplate | dropSchemaTemplate | dropTimeseriesOfSchemaTemplate
| showSchemaTemplates | showNodesInSchemaTemplate | showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
| setSchemaTemplate | unsetSchemaTemplate
+ | alterSchemaTemplate
// TTL
| setTTL | unsetTTL | showTTL | showAllTTL
// Function
@@ -269,6 +270,10 @@ unsetSchemaTemplate
: UNSET SCHEMA TEMPLATE templateName=identifier FROM prefixPath
;
+alterSchemaTemplate
+ : ALTER SCHEMA TEMPLATE templateName=identifier ADD LR_BRACKET templateMeasurementClause (COMMA templateMeasurementClause)* RR_BRACKET
+ ;
+
// TTL =============================================================================================
// ---- Set TTL
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index ce1eda69d1..fc0e0fd225 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -99,6 +99,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
@@ -341,6 +342,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case UnsetTemplate:
plan = new UnsetSchemaTemplatePlan();
break;
+ case ExtendSchemaTemplate:
+ plan = new ExtendSchemaTemplatePlan();
+ break;
case GetNodePathsPartition:
plan = new GetNodePathsPartitionPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index e3408eb7e0..9b88ca9414 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -115,6 +115,7 @@ public enum ConfigPhysicalPlanType {
DropSchemaTemplate((short) 811),
PreSetSchemaTemplate((short) 812),
CommitSetSchemaTemplate((short) 813),
+ ExtendSchemaTemplate((short) 814),
/** Deprecated types for sync, restored them for upgrade */
@Deprecated
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/ExtendSchemaTemplatePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/ExtendSchemaTemplatePlan.java
new file mode 100644
index 0000000000..69cfc5c381
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/ExtendSchemaTemplatePlan.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.template;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ExtendSchemaTemplatePlan extends ConfigPhysicalPlan {
+
+ private TemplateExtendInfo templateExtendInfo;
+
+ public ExtendSchemaTemplatePlan() {
+ super(ConfigPhysicalPlanType.ExtendSchemaTemplate);
+ }
+
+ public ExtendSchemaTemplatePlan(TemplateExtendInfo templateExtendInfo) {
+ super(ConfigPhysicalPlanType.ExtendSchemaTemplate);
+ this.templateExtendInfo = templateExtendInfo;
+ }
+
+ public TemplateExtendInfo getTemplateExtendInfo() {
+ return templateExtendInfo;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ templateExtendInfo.serialize(stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ templateExtendInfo = new TemplateExtendInfo();
+ templateExtendInfo.deserialize(buffer);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index b73cceb573..bd5d3209f0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaTemplatePlan;
@@ -70,6 +71,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
+import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUtil;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -726,6 +732,77 @@ public class ClusterSchemaManager {
return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus();
}
+ public synchronized TSStatus extendSchemaTemplate(TemplateExtendInfo templateExtendInfo) {
+ if (templateExtendInfo.getEncodings() != null) {
+ for (int i = 0; i < templateExtendInfo.getDataTypes().size(); i++) {
+ try {
+ SchemaUtils.checkDataTypeWithEncoding(
+ templateExtendInfo.getDataTypes().get(i), templateExtendInfo.getEncodings().get(i));
+ } catch (MetadataException e) {
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+ }
+ }
+
+ Template template =
+ clusterSchemaInfo
+ .getTemplate(new GetSchemaTemplatePlan(templateExtendInfo.getTemplateName()))
+ .getTemplateList()
+ .get(0);
+ boolean needExtend = false;
+ for (String measurement : templateExtendInfo.getMeasurements()) {
+ if (!template.hasSchema(measurement)) {
+ needExtend = true;
+ break;
+ }
+ }
+
+ if (!needExtend) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ ExtendSchemaTemplatePlan extendSchemaTemplatePlan =
+ new ExtendSchemaTemplatePlan(templateExtendInfo);
+ TSStatus status = getConsensusManager().write(extendSchemaTemplatePlan).getStatus();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ template =
+ clusterSchemaInfo
+ .getTemplate(new GetSchemaTemplatePlan(templateExtendInfo.getTemplateName()))
+ .getTemplateList()
+ .get(0);
+
+ TUpdateTemplateReq updateTemplateReq = new TUpdateTemplateReq();
+ updateTemplateReq.setType(TemplateInternalRPCUpdateType.UPDATE_TEMPLATE_INFO.toByte());
+ updateTemplateReq.setTemplateInfo(
+ TemplateInternalRPCUtil.generateUpdateTemplateInfoBytes(template));
+
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+
+ AsyncClientHandler<TUpdateTemplateReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.UPDATE_TEMPLATE, updateTemplateReq, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (Map.Entry<Integer, TSStatus> entry : statusMap.entrySet()) {
+ if (entry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to sync template {} extension info to DataNode {}",
+ template.getName(),
+ dataNodeLocationMap.get(entry.getKey()));
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ String.format(
+ "Failed to sync template %s extension info to DataNode %s",
+ template.getName(), dataNodeLocationMap.get(entry.getKey())));
+ }
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
/**
* When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
* the different NodeStatstics event to SyncManager and ClusterSchemaManager.
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 52a3ccb279..7fd2fb85c5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -101,6 +101,7 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -167,6 +168,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.TemplateAlterOperationType;
+import org.apache.iotdb.db.metadata.template.alter.TemplateAlterOperationUtil;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -1541,6 +1544,23 @@ public class ConfigManager implements IManager {
}
}
+ @Override
+ public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ ByteBuffer buffer = ByteBuffer.wrap(req.getTemplateAlterInfo());
+ TemplateAlterOperationType operationType =
+ TemplateAlterOperationUtil.parseOperationType(buffer);
+ if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
+ return clusterSchemaManager.extendSchemaTemplate(
+ TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer));
+ }
+ return RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION);
+ } else {
+ return status;
+ }
+ }
+
@Override
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 9726b93b2d..a861635f5b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
@@ -539,6 +540,8 @@ public interface IManager {
/** Drop schema template */
TSStatus dropSchemaTemplate(String templateName);
+ TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req);
+
/*
* delete timeseries
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index bbbe6a879a..7c6fa50350 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -89,6 +89,7 @@ import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRe
import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
@@ -386,6 +387,8 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.unsetSchemaTemplate((UnsetSchemaTemplatePlan) physicalPlan);
case DropSchemaTemplate:
return clusterSchemaInfo.dropSchemaTemplate((DropSchemaTemplatePlan) physicalPlan);
+ case ExtendSchemaTemplate:
+ return clusterSchemaInfo.extendSchemaTemplate((ExtendSchemaTemplatePlan) physicalPlan);
case CreatePipeV2:
return pipeInfo.getPipeTaskInfo().createPipe((CreatePipePlanV2) physicalPlan);
case SetPipeStatusV2:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
index ea28545b8b..aa965b2570 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetTimeParti
import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
@@ -59,6 +60,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.metadata.mtree.ConfigMTree;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUtil;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -942,6 +944,16 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
}
+ public TSStatus extendSchemaTemplate(ExtendSchemaTemplatePlan extendSchemaTemplatePlan) {
+ TemplateExtendInfo templateExtendInfo = extendSchemaTemplatePlan.getTemplateExtendInfo();
+ try {
+ templateTable.extendTemplate(templateExtendInfo);
+ return RpcUtils.SUCCESS_STATUS;
+ } catch (MetadataException e) {
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+ }
+
public Map<String, TDatabaseSchema> getMatchedDatabaseSchemasByOneName(
String[] databasePathPattern) {
Map<String, TDatabaseSchema> schemaMap = new HashMap<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
index 5113f41a31..24d9335445 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
@@ -23,7 +23,13 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
@@ -46,6 +52,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
public class TemplateTable {
private static final Logger LOGGER = LoggerFactory.getLogger(TemplateTable.class);
@@ -130,6 +138,45 @@ public class TemplateTable {
}
}
+ public void extendTemplate(TemplateExtendInfo templateExtendInfo) throws MetadataException {
+ templateReadWriteLock.writeLock().lock();
+ try {
+ Template template = templateMap.get(templateExtendInfo.getTemplateName());
+ List<String> measurementList = templateExtendInfo.getMeasurements();
+ List<TSDataType> dataTypeList = templateExtendInfo.getDataTypes();
+ List<TSEncoding> encodingList = templateExtendInfo.getEncodings();
+ List<CompressionType> compressionTypeList = templateExtendInfo.getCompressors();
+
+ IMeasurementSchema measurementSchema;
+ for (int i = 0; i < measurementList.size(); i++) {
+ measurementSchema = template.getSchema(measurementList.get(i));
+ if (measurementSchema == null) {
+ template.addMeasurement(
+ measurementList.get(i),
+ dataTypeList.get(i),
+ encodingList == null ? getDefaultEncoding(dataTypeList.get(i)) : encodingList.get(i),
+ compressionTypeList == null
+ ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+ : compressionTypeList.get(i));
+ } else {
+ if (!measurementSchema.getType().equals(dataTypeList.get(i))
+ || (encodingList != null
+ && !measurementSchema.getEncodingType().equals(encodingList.get(i)))
+ || (compressionTypeList != null
+ && !measurementSchema.getCompressor().equals(compressionTypeList.get(i)))) {
+ throw new MetadataException(
+ String.format(
+ "Schema of measurement %s is not compatible with existing measurement in template %s",
+ measurementList.get(i), template.getName()));
+ }
+ }
+ }
+
+ } finally {
+ templateReadWriteLock.writeLock().unlock();
+ }
+ }
+
private void serialize(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(templateIdGenerator.get(), outputStream);
ReadWriteIOUtils.write(templateMap.size(), outputStream);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2e80a9b0d1..5413818646 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
@@ -845,6 +846,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.dropSchemaTemplate(req);
}
+ @Override
+ public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) throws TException {
+ return configManager.alterSchemaTemplate(req);
+ }
+
@Override
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
return configManager.deleteTimeSeries(req);
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBExtendTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBExtendTemplateIT.java
new file mode 100644
index 0000000000..faefd2995f
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBExtendTemplateIT.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.schema;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBExtendTemplateIT extends AbstractSchemaIT {
+
+ public IoTDBExtendTemplateIT(SchemaTestMode schemaTestMode) {
+ super(schemaTestMode);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ super.tearDown();
+ }
+
+ @Test
+ public void testManualExtendTemplate() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ // create database
+ statement.execute("CREATE DATABASE root.db");
+
+ // create schema template
+ statement.execute(
+ "CREATE SCHEMA TEMPLATE t1 (s1 INT64 ENCODING=PLAIN, s2 DOUBLE ENCODING=RLE)");
+
+ statement.execute("SET SCHEMA TEMPLATE t1 to root.db");
+
+ statement.execute("CREATE TIMESERIES USING SCHEMA TEMPLATE on root.db.d1");
+
+ statement.execute(
+ "ALTER SCHEMA TEMPLATE t1 ADD(s3 INT64 ENCODING=RLE, s4 DOUBLE ENCODING=GORILLA)");
+
+ String[] sqls =
+ new String[] {
+ "show timeseries",
+ };
+ Set<String>[] standards =
+ new Set[] {
+ new HashSet<>(
+ Arrays.asList(
+ "root.db.d1.s1,null,root.db,INT64,PLAIN,SNAPPY,null,null,null,null,",
+ "root.db.d1.s2,null,root.db,DOUBLE,RLE,SNAPPY,null,null,null,null,",
+ "root.db.d1.s3,null,root.db,INT64,RLE,SNAPPY,null,null,null,null,",
+ "root.db.d1.s4,null,root.db,DOUBLE,GORILLA,SNAPPY,null,null,null,null,"))
+ };
+ for (int n = 0; n < sqls.length; n++) {
+ String sql = sqls[n];
+ Set<String> standard = standards[n];
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ String string = builder.toString();
+ Assert.assertTrue(standard.contains(string));
+ standard.remove(string);
+ }
+ assertEquals(0, standard.size());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAutoExtendTemplate() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ // create database
+ statement.execute("CREATE DATABASE root.db");
+
+ // create schema template
+ statement.execute(
+ "CREATE SCHEMA TEMPLATE t1 (s1 INT64 ENCODING=PLAIN, s2 DOUBLE ENCODING=RLE)");
+
+ statement.execute("SET SCHEMA TEMPLATE t1 to root.db");
+
+ statement.execute("INSERT INTO root.db.d1(time, s1, s3) values(1, 1, 1)");
+ statement.execute("INSERT INTO root.db.d2(time, s4, s5) values(1, 1, 1)");
+ statement.execute("INSERT INTO root.db1.d1(time, s2, s3) values(1, 1, 1)");
+
+ String[] sqls =
+ new String[] {
+ "show timeseries",
+ };
+ Set<String>[] standards =
+ new Set[] {
+ new HashSet<>(
+ Arrays.asList(
+ "root.db.d1.s1,null,root.db,INT64,PLAIN,SNAPPY,null,null,null,null,",
+ "root.db.d1.s2,null,root.db,DOUBLE,RLE,SNAPPY,null,null,null,null,",
+ "root.db.d1.s3,null,root.db,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db.d1.s4,null,root.db,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db.d1.s5,null,root.db,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db.d2.s1,null,root.db,INT64,PLAIN,SNAPPY,null,null,null,null,",
+ "root.db.d2.s2,null,root.db,DOUBLE,RLE,SNAPPY,null,null,null,null,",
+ "root.db.d2.s3,null,root.db,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db.d2.s4,null,root.db,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db.d2.s5,null,root.db,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db1.d1.s2,null,root.db1,FLOAT,GORILLA,SNAPPY,null,null,null,null,",
+ "root.db1.d1.s3,null,root.db1,FLOAT,GORILLA,SNAPPY,null,null,null,null,"))
+ };
+ for (int n = 0; n < sqls.length; n++) {
+ String sql = sqls[n];
+ Set<String> standard = standards[n];
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ String string = builder.toString();
+ Assert.assertTrue(standard.contains(string));
+ standard.remove(string);
+ }
+ assertEquals(0, standard.size());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java
index cbf8b09dba..117e1fa417 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java
@@ -334,7 +334,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg2.d2");
statement.execute("SET SCHEMA TEMPLATE t2 TO root.sg3.d1");
statement.execute("SET SCHEMA TEMPLATE t2 TO root.sg3.d2");
- statement.execute("INSERT INTO root.sg3.d2.verify(time, show) VALUES (1, 1)");
+ statement.execute("INSERT INTO root.sg3.d2.verify(time, show) ALIGNED VALUES (1, 1)");
try (ResultSet resultSet = statement.executeQuery("SHOW PATHS USING SCHEMA TEMPLATE t1")) {
String resultRecord;
@@ -387,7 +387,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
Assert.assertEquals(0, expectedResultSet.size());
ResultSet resultSet = statement.executeQuery("SHOW PATHS USING SCHEMA TEMPLATE t2");
- Assert.assertFalse(resultSet.next());
+ Assert.assertTrue(resultSet.next());
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
index 8eba00d606..99e63bfe49 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSchemaTemplateIT.java
@@ -48,6 +48,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -326,4 +327,65 @@ public class IoTDBSessionSchemaTemplateIT {
}
}
}
+
+ @Test
+ public void testHybridAutoCreateSchema()
+ throws StatementExecutionException, IoTDBConnectionException, IOException {
+ session.createDatabase("root.db");
+
+ Template temp1 = getTemplate("template1");
+ Template temp2 = getTemplate("template2");
+
+ assertEquals("[]", session.showAllTemplates().toString());
+
+ session.createSchemaTemplate(temp1);
+ session.createSchemaTemplate(temp2);
+
+ session.setSchemaTemplate("template1", "root.db.v1");
+
+ session.createTimeseriesUsingSchemaTemplate(Collections.singletonList("root.db.v1.d1"));
+
+ session.setSchemaTemplate("template2", "root.db.v4");
+
+ List<String> deviceIds =
+ Arrays.asList("root.db.v1.d1", "root.db.v1.d2", "root.db.v2.d1", "root.db.v4.d1");
+ List<Long> timestamps = Arrays.asList(1L, 1L, 1L, 1L);
+ List<String> measurements = Arrays.asList("x", "y", "z");
+ List<List<String>> allMeasurements =
+ Arrays.asList(measurements, measurements, measurements, measurements);
+ List<TSDataType> tsDataTypes =
+ Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.TEXT);
+ List<List<TSDataType>> allTsDataTypes =
+ Arrays.asList(tsDataTypes, tsDataTypes, tsDataTypes, tsDataTypes);
+ List<Object> values = Arrays.asList(1f, 2f, "3");
+ List<List<Object>> allValues = Arrays.asList(values, values, values, values);
+
+ session.insertRecords(deviceIds, timestamps, allMeasurements, allTsDataTypes, allValues);
+
+ Set<String> expectedSeries =
+ new HashSet<>(
+ Arrays.asList(
+ "root.db.v1.d1.x",
+ "root.db.v1.d1.y",
+ "root.db.v1.d1.z",
+ "root.db.v1.d2.x",
+ "root.db.v1.d2.y",
+ "root.db.v1.d2.z",
+ "root.db.v2.d1.x",
+ "root.db.v2.d1.y",
+ "root.db.v2.d1.z",
+ "root.db.v4.d1.x",
+ "root.db.v4.d1.y",
+ "root.db.v4.d1.z"));
+
+ try (SessionDataSet dataSet = session.executeQueryStatement("show timeseries")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertTrue(expectedSeries.contains(iterator.getString(1)));
+ expectedSeries.remove(iterator.getString(1));
+ }
+ }
+
+ Assert.assertTrue(expectedSeries.isEmpty());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 0fc95f42b0..265a62e002 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
@@ -1651,6 +1652,27 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.alterSchemaTemplate(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+ configNode,
+ config.getAddressAndPort(),
+ Thread.currentThread().getStackTrace()[1].getMethodName());
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index f1830d362b..56f3a5f5aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -70,7 +70,6 @@ import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -1905,7 +1904,6 @@ public class DataRegion implements IDataRegionForQuery {
// delete Last cache record if necessary
// todo implement more precise process
DataNodeSchemaCache.getInstance().invalidateAll();
- DataNodeTemplateSchemaCache.getInstance().invalidateCache();
// write log to impacted working TsFileProcessors
List<WALFlushListener> walListeners =
@@ -2315,7 +2313,6 @@ public class DataRegion implements IDataRegionForQuery {
return;
}
DataNodeSchemaCache.getInstance().invalidateAll();
- DataNodeTemplateSchemaCache.getInstance().invalidateCache();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java
index 227d3dace3..39837a504c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java
@@ -22,6 +22,14 @@ package org.apache.iotdb.db.metadata.cache;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputationWithAutoCreation;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -29,6 +37,10 @@ import com.github.benmanes.caffeine.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DataNodeTemplateSchemaCache {
@@ -36,7 +48,9 @@ public class DataNodeTemplateSchemaCache {
private static final Logger logger = LoggerFactory.getLogger(DataNodeTemplateSchemaCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private final Cache<PartialPath, Integer> cache;
+ private final Cache<PartialPath, DeviceCacheEntry> cache;
+
+ private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
// cache update due to activation or clear procedure
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
@@ -47,7 +61,8 @@ public class DataNodeTemplateSchemaCache {
Caffeine.newBuilder()
.maximumWeight(config.getAllocateMemoryForSchemaCache())
.weigher(
- (Weigher<PartialPath, Integer>) (key, val) -> (PartialPath.estimateSize(key) + 16))
+ (Weigher<PartialPath, DeviceCacheEntry>)
+ (key, val) -> (PartialPath.estimateSize(key) + 32))
.build();
}
@@ -76,20 +91,103 @@ public class DataNodeTemplateSchemaCache {
readWriteLock.writeLock().unlock();
}
- public Integer get(PartialPath path) {
- takeReadLock();
- try {
- return cache.getIfPresent(path);
- } finally {
- releaseReadLock();
+ public ClusterSchemaTree get(PartialPath fullPath) {
+ DeviceCacheEntry deviceCacheEntry = cache.getIfPresent(fullPath.getDevicePath());
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ if (deviceCacheEntry != null) {
+ Template template = templateManager.getTemplate(deviceCacheEntry.getTemplateId());
+ schemaTree.appendSingleMeasurement(
+ fullPath,
+ (MeasurementSchema) template.getSchema(fullPath.getMeasurement()),
+ null,
+ null,
+ template.isDirectAligned());
+ schemaTree.setDatabases(Collections.singleton(deviceCacheEntry.getDatabase()));
+ }
+ return schemaTree;
+ }
+
+ /**
+ * CONFORM indicates that the provided devicePath had been cached as a template activated path,
+ * ensuring that the alignment of the device, as well as the name and schema of every measurement
+ * are consistent with the cache.
+ *
+ * @param computation
+ * @return true if conform to template cache, which means no need to fetch or create anymore
+ */
+ public List<Integer> conformsToTemplateCache(ISchemaComputationWithAutoCreation computation) {
+ List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+ PartialPath devicePath = computation.getDevicePath();
+ String[] measurements = computation.getMeasurements();
+ DeviceCacheEntry deviceCacheEntry = cache.getIfPresent(devicePath);
+ if (deviceCacheEntry == null) {
+ for (int i = 0; i < measurements.length; i++) {
+ indexOfMissingMeasurements.add(i);
+ }
+ return indexOfMissingMeasurements;
}
+
+ computation.computeDevice(
+ templateManager.getTemplate(deviceCacheEntry.getTemplateId()).isDirectAligned());
+ Map<String, IMeasurementSchema> templateSchema =
+ templateManager.getTemplate(deviceCacheEntry.getTemplateId()).getSchemaMap();
+ for (int i = 0; i < measurements.length; i++) {
+ if (!templateSchema.containsKey(measurements[i])) {
+ indexOfMissingMeasurements.add(i);
+ continue;
+ }
+ IMeasurementSchema schema = templateSchema.get(measurements[i]);
+ computation.computeMeasurement(
+ i,
+ new IMeasurementSchemaInfo() {
+ @Override
+ public String getName() {
+ return schema.getMeasurementId();
+ }
+
+ @Override
+ public MeasurementSchema getSchema() {
+ return new MeasurementSchema(
+ schema.getMeasurementId(),
+ schema.getType(),
+ schema.getEncodingType(),
+ schema.getCompressor());
+ }
+
+ @Override
+ public String getAlias() {
+ return null;
+ }
+ });
+ }
+ return indexOfMissingMeasurements;
}
- public void put(PartialPath path, Integer id) {
- cache.put(path, id);
+ public void put(PartialPath path, String database, Integer id) {
+ cache.put(path, new DeviceCacheEntry(database, id));
}
public void invalidateCache() {
cache.invalidateAll();
}
+
+ private static class DeviceCacheEntry {
+
+ private final String database;
+
+ private final int templateId;
+
+ private DeviceCacheEntry(String database, int templateId) {
+ this.database = database.intern();
+ this.templateId = templateId;
+ }
+
+ public int getTemplateId() {
+ return templateId;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
index b1cde92fc2..b618106965 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
@@ -596,6 +596,17 @@ public class ClusterTemplateManager implements ITemplateManager {
}
}
+ public void updateTemplateInfo(byte[] templateInfo) {
+ readWriteLock.writeLock().lock();
+ try {
+ Template template =
+ TemplateInternalRPCUtil.parseUpdateTemplateInfoBytes(ByteBuffer.wrap(templateInfo));
+ templateIdMap.put(template.getId(), template);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
@TestOnly
public void putTemplate(Template template) {
templateIdMap.put(template.getId(), template);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index 33a655b778..9cf66897e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -34,9 +34,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class Template implements Serializable {
@@ -48,7 +48,7 @@ public class Template implements Serializable {
private transient int rehashCode;
public Template() {
- schemaMap = new HashMap<>();
+ schemaMap = new ConcurrentHashMap<>();
}
public Template(
@@ -70,7 +70,7 @@ public class Template implements Serializable {
boolean isAligned)
throws IllegalPathException {
this.isDirectAligned = isAligned;
- this.schemaMap = new HashMap<>();
+ this.schemaMap = new ConcurrentHashMap<>();
this.name = name;
for (int i = 0; i < measurements.size(); i++) {
IMeasurementSchema schema =
@@ -147,6 +147,14 @@ public class Template implements Serializable {
}
}
+ public void addMeasurement(
+ String measurement,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressionType) {
+ schemaMap.put(measurement, constructSchema(measurement, dataType, encoding, compressionType));
+ }
+
// endregion
public void serialize(ByteBuffer buffer) {
@@ -186,7 +194,7 @@ public class Template implements Serializable {
name = ReadWriteIOUtils.readString(buffer);
isDirectAligned = ReadWriteIOUtils.readBool(buffer);
int schemaSize = ReadWriteIOUtils.readInt(buffer);
- schemaMap = new HashMap<>(schemaSize);
+ schemaMap = new ConcurrentHashMap<>(schemaSize);
for (int i = 0; i < schemaSize; i++) {
String schemaName = ReadWriteIOUtils.readString(buffer);
byte flag = ReadWriteIOUtils.readByte(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateAlterOperationType.java
similarity index 71%
copy from server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateAlterOperationType.java
index 7e15b66038..7214c4c318 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateAlterOperationType.java
@@ -25,15 +25,12 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-public enum TemplateInternalRPCUpdateType {
- ADD_TEMPLATE_SET_INFO((byte) 0),
- INVALIDATE_TEMPLATE_SET_INFO((byte) 1),
- ADD_TEMPLATE_PRE_SET_INFO((byte) 2),
- COMMIT_TEMPLATE_SET_INFO((byte) 3);
+public enum TemplateAlterOperationType {
+ EXTEND_TEMPLATE((byte) 0);
private final byte operationType;
- TemplateInternalRPCUpdateType(byte operationType) {
+ TemplateAlterOperationType(byte operationType) {
this.operationType = operationType;
}
@@ -45,21 +42,15 @@ public enum TemplateInternalRPCUpdateType {
ReadWriteIOUtils.write(operationType, stream);
}
- public static TemplateInternalRPCUpdateType deserialize(ByteBuffer buffer) {
+ public static TemplateAlterOperationType deserialize(ByteBuffer buffer) {
byte type = ReadWriteIOUtils.readByte(buffer);
return getType(type);
}
- public static TemplateInternalRPCUpdateType getType(byte type) {
+ public static TemplateAlterOperationType getType(byte type) {
switch (type) {
case 0:
- return ADD_TEMPLATE_SET_INFO;
- case 1:
- return INVALIDATE_TEMPLATE_SET_INFO;
- case 2:
- return ADD_TEMPLATE_PRE_SET_INFO;
- case 3:
- return COMMIT_TEMPLATE_SET_INFO;
+ return EXTEND_TEMPLATE;
default:
throw new IllegalArgumentException("Unknown template update operation type" + type);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
index 7e15b66038..0f82bc54e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
@@ -29,7 +29,8 @@ public enum TemplateInternalRPCUpdateType {
ADD_TEMPLATE_SET_INFO((byte) 0),
INVALIDATE_TEMPLATE_SET_INFO((byte) 1),
ADD_TEMPLATE_PRE_SET_INFO((byte) 2),
- COMMIT_TEMPLATE_SET_INFO((byte) 3);
+ COMMIT_TEMPLATE_SET_INFO((byte) 3),
+ UPDATE_TEMPLATE_INFO((byte) 4);
private final byte operationType;
@@ -60,6 +61,8 @@ public enum TemplateInternalRPCUpdateType {
return ADD_TEMPLATE_PRE_SET_INFO;
case 3:
return COMMIT_TEMPLATE_SET_INFO;
+ case 4:
+ return UPDATE_TEMPLATE_INFO;
default:
throw new IllegalArgumentException("Unknown template update operation type" + type);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java
index c70a66812f..bbdb7553e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java
@@ -118,4 +118,14 @@ public class TemplateInternalRPCUtil {
public static Pair<Integer, String> parseInvalidateTemplateSetInfoBytes(ByteBuffer buffer) {
return new Pair<>(ReadWriteIOUtils.readInt(buffer), ReadWriteIOUtils.readString(buffer));
}
+
+ public static byte[] generateUpdateTemplateInfoBytes(Template template) {
+ return template.serialize().array();
+ }
+
+ public static Template parseUpdateTemplateInfoBytes(ByteBuffer buffer) {
+ Template template = new Template();
+ template.deserialize(buffer);
+ return template;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateAlterInfo.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateAlterInfo.java
new file mode 100644
index 0000000000..0d0aa4a4e7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateAlterInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.template.alter;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public abstract class TemplateAlterInfo {
+
+ protected String templateName;
+
+ public String getTemplateName() {
+ return templateName;
+ }
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(templateName, outputStream);
+ }
+
+ public void deserialize(ByteBuffer buffer) {
+ this.templateName = ReadWriteIOUtils.readString(buffer);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateAlterOperationUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateAlterOperationUtil.java
new file mode 100644
index 0000000000..0efb8bfa12
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateAlterOperationUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.template.alter;
+
+import org.apache.iotdb.db.metadata.template.TemplateAlterOperationType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class TemplateAlterOperationUtil {
+
+ public static byte[] generateExtendTemplateReqInfo(
+ TemplateAlterOperationType operationType, TemplateAlterInfo templateAlterInfo) {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ operationType.serialize(outputStream);
+ templateAlterInfo.serialize(outputStream);
+ } catch (IOException ignored) {
+
+ }
+ return outputStream.toByteArray();
+ }
+
+ public static TemplateAlterOperationType parseOperationType(ByteBuffer buffer) {
+ return TemplateAlterOperationType.deserialize(buffer);
+ }
+
+ public static TemplateExtendInfo parseTemplateExtendInfo(ByteBuffer buffer) {
+ TemplateExtendInfo templateExtendInfo = new TemplateExtendInfo();
+ templateExtendInfo.deserialize(buffer);
+ return templateExtendInfo;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateExtendInfo.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateExtendInfo.java
new file mode 100644
index 0000000000..1ecdf0c878
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/alter/TemplateExtendInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.template.alter;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TemplateExtendInfo extends TemplateAlterInfo {
+
+ private List<String> measurements;
+ private List<TSDataType> dataTypes;
+ private List<TSEncoding> encodings;
+ private List<CompressionType> compressors;
+
+ public TemplateExtendInfo() {}
+
+ public TemplateExtendInfo(String templateName) {
+ this.templateName = templateName;
+ }
+
+ public TemplateExtendInfo(
+ String templateName,
+ List<String> measurements,
+ List<TSDataType> dataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors) {
+ this.templateName = templateName;
+ this.measurements = measurements;
+ this.dataTypes = dataTypes;
+ this.encodings = encodings;
+ this.compressors = compressors;
+ }
+
+ public String getTemplateName() {
+ return templateName;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public List<TSEncoding> getEncodings() {
+ return encodings;
+ }
+
+ public List<CompressionType> getCompressors() {
+ return compressors;
+ }
+
+ public void addMeasurement(
+ String measurement,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressionType) {
+ if (measurements == null) {
+ measurements = new ArrayList<>();
+ }
+ measurements.add(measurement);
+
+ if (dataTypes == null) {
+ dataTypes = new ArrayList<>();
+ }
+ dataTypes.add(dataType);
+
+ if (encodings == null) {
+ encodings = new ArrayList<>();
+ }
+ encodings.add(encoding);
+
+ if (compressors == null) {
+ compressors = new ArrayList<>();
+ }
+ compressors.add(compressionType);
+ }
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ super.serialize(outputStream);
+ ReadWriteIOUtils.write(measurements.size(), outputStream);
+ for (String measurement : measurements) {
+ ReadWriteIOUtils.write(measurement, outputStream);
+ }
+ for (TSDataType dataType : dataTypes) {
+ ReadWriteIOUtils.write(dataType, outputStream);
+ }
+
+ if (encodings == null || encodings.isEmpty()) {
+ ReadWriteIOUtils.write((byte) 0, outputStream);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ for (TSEncoding encoding : encodings) {
+ ReadWriteIOUtils.write(encoding, outputStream);
+ }
+ }
+
+ if (compressors == null || compressors.isEmpty()) {
+ ReadWriteIOUtils.write((byte) 0, outputStream);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ for (CompressionType compressionType : compressors) {
+ ReadWriteIOUtils.write(compressionType, outputStream);
+ }
+ }
+ }
+
+ public void deserialize(ByteBuffer buffer) {
+ super.deserialize(buffer);
+ int size = ReadWriteIOUtils.readInt(buffer);
+ this.measurements = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ measurements.add(ReadWriteIOUtils.readString(buffer));
+ }
+
+ this.dataTypes = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ dataTypes.add(ReadWriteIOUtils.readDataType(buffer));
+ }
+
+ if (ReadWriteIOUtils.readByte(buffer) == 1) {
+ this.encodings = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ encodings.add(ReadWriteIOUtils.readEncoding(buffer));
+ }
+ }
+
+ if (ReadWriteIOUtils.readByte(buffer) == 1) {
+ this.compressors = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ compressors.add(ReadWriteIOUtils.readCompressionType(buffer));
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 4b4ef0f795..68f4b4782a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.TemplateAlterOperationType;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.internal.InternalBatchActivateTemp
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -68,39 +71,13 @@ class AutoCreateSchemaExecutor {
}
// auto create the missing measurements and merge them into given schemaTree
- void autoCreateMissingMeasurements(
+ void autoCreateTimeSeries(
ClusterSchemaTree schemaTree,
PartialPath devicePath,
List<Integer> indexOfTargetMeasurements,
String[] measurements,
Function<Integer, TSDataType> getDataType,
boolean isAligned) {
- // check whether there is template should be activated
- Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
- if (templateInfo != null) {
- Template template = templateInfo.left;
- List<Integer> indexOfMeasurementsNotInTemplate =
- checkMeasurementsInSchemaTemplate(
- devicePath, indexOfTargetMeasurements, measurements, isAligned, template);
- if (indexOfMeasurementsNotInTemplate.size() < indexOfTargetMeasurements.size()) {
- // there are measurements in schema template
- internalActivateTemplate(devicePath);
- for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(entry.getKey()),
- (MeasurementSchema) entry.getValue(),
- null,
- null,
- template.isDirectAligned());
- }
- }
- if (indexOfMeasurementsNotInTemplate.isEmpty()) {
- return;
- }
- // there are measurements need to be created as normal timeseries
- indexOfTargetMeasurements = indexOfMeasurementsNotInTemplate;
- }
-
// auto create the rest missing timeseries
List<String> missingMeasurements = new ArrayList<>(indexOfTargetMeasurements.size());
List<TSDataType> dataTypesOfMissingMeasurement =
@@ -135,6 +112,119 @@ class AutoCreateSchemaExecutor {
}
}
+ void autoCreateTimeSeries(
+ ClusterSchemaTree schemaTree,
+ List<PartialPath> devicePathList,
+ List<Integer> indexOfTargetDevices,
+ List<List<Integer>> indexOfTargetMeasurementsList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<Boolean> isAlignedList) {
+ // check whether there is template should be activated
+ Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries =
+ new HashMap<>();
+ int deviceIndex;
+ PartialPath devicePath;
+ List<Integer> indexOfTargetMeasurements;
+ for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
+ deviceIndex = indexOfTargetDevices.get(i);
+ devicePath = devicePathList.get(deviceIndex);
+ indexOfTargetMeasurements = indexOfTargetMeasurementsList.get(i);
+
+ // there are measurements need to be created as normal timeseries
+ int finalDeviceIndex = deviceIndex;
+ List<Integer> finalIndexOfMeasurementsNotInTemplate = indexOfTargetMeasurements;
+ devicesNeedAutoCreateTimeSeries.compute(
+ devicePath,
+ (k, v) -> {
+ if (v == null) {
+ v = new Pair<>(isAlignedList.get(finalDeviceIndex), new MeasurementGroup());
+ }
+ MeasurementGroup measurementGroup = v.right;
+ String[] measurements = measurementsList.get(finalDeviceIndex);
+ TSDataType[] tsDataTypes = tsDataTypesList.get(finalDeviceIndex);
+ for (int measurementIndex : finalIndexOfMeasurementsNotInTemplate) {
+ if (tsDataTypes[measurementIndex] == null) {
+ continue;
+ }
+ measurementGroup.addMeasurement(
+ measurements[measurementIndex],
+ tsDataTypes[measurementIndex],
+ getDefaultEncoding(tsDataTypes[measurementIndex]),
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
+ }
+ return v;
+ });
+ }
+
+ if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
+ internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries);
+ }
+ }
+
+ void autoExtendTemplate(
+ String templateName, List<String> measurementList, List<TSDataType> dataTypeList) {
+ internalExtendTemplate(templateName, measurementList, dataTypeList, null, null);
+ }
+
+ void autoExtendTemplate(Map<String, TemplateExtendInfo> templateExtendInfoMap) {
+ TemplateExtendInfo templateExtendInfo;
+ for (Map.Entry<String, TemplateExtendInfo> entry : templateExtendInfoMap.entrySet()) {
+ templateExtendInfo = entry.getValue();
+ internalExtendTemplate(
+ entry.getKey(),
+ templateExtendInfo.getMeasurements(),
+ templateExtendInfo.getDataTypes(),
+ templateExtendInfo.getEncodings(),
+ templateExtendInfo.getCompressors());
+ }
+ }
+
+ void autoActivateTemplate(ClusterSchemaTree schemaTree, PartialPath devicePath, int templateId) {
+ internalActivateTemplate(devicePath);
+ Template template = templateManager.getTemplate(templateId);
+ for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(entry.getKey()),
+ (MeasurementSchema) entry.getValue(),
+ null,
+ null,
+ template.isDirectAligned());
+ }
+ }
+
+ void autoActivateTemplate(
+ ClusterSchemaTree schemaTree,
+ List<PartialPath> deviceList,
+ List<Pair<Template, PartialPath>> templateSetInfoList) {
+ Map<PartialPath, Pair<Template, PartialPath>> devicesNeedActivateTemplate = new HashMap<>();
+ for (int i = 0; i < deviceList.size(); i++) {
+ devicesNeedActivateTemplate.put(
+ deviceList.get(i),
+ new Pair<>(
+ templateManager.getTemplate(templateSetInfoList.get(i).left.getId()),
+ templateSetInfoList.get(i).right));
+ }
+ internalActivateTemplate(devicesNeedActivateTemplate);
+ PartialPath devicePath;
+ Template template;
+ for (Map.Entry<PartialPath, Pair<Template, PartialPath>> entry :
+ devicesNeedActivateTemplate.entrySet()) {
+ devicePath = entry.getKey();
+ // take the latest template
+ template = templateManager.getTemplate(entry.getValue().left.getId());
+ for (Map.Entry<String, IMeasurementSchema> measurementEntry :
+ template.getSchemaMap().entrySet()) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(measurementEntry.getKey()),
+ (MeasurementSchema) measurementEntry.getValue(),
+ null,
+ null,
+ template.isDirectAligned());
+ }
+ }
+ }
+
void autoCreateMissingMeasurements(
ClusterSchemaTree schemaTree,
List<PartialPath> devicePathList,
@@ -156,6 +246,7 @@ class AutoCreateSchemaExecutor {
Pair<Template, PartialPath> templateInfo;
Template template;
List<Integer> indexOfMeasurementsNotInTemplate;
+ Map<String, TemplateExtendInfo> templateExtendInfoMap = new HashMap<>();
for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
deviceIndex = indexOfTargetDevices.get(i);
devicePath = devicePathList.get(deviceIndex);
@@ -167,26 +258,9 @@ class AutoCreateSchemaExecutor {
}
if (templateInfo == null) {
- indexOfMeasurementsNotInTemplate = indexOfTargetMeasurements;
- } else {
- template = templateInfo.left;
- indexOfMeasurementsNotInTemplate =
- checkMeasurementsInSchemaTemplate(
- devicePath,
- indexOfTargetMeasurements,
- measurementsList.get(deviceIndex),
- isAlignedList.get(deviceIndex),
- template);
- if (indexOfMeasurementsNotInTemplate.size() < indexOfTargetMeasurements.size()) {
- // there are measurements in schema template
- devicesNeedActivateTemplate.putIfAbsent(devicePath, templateInfo);
- }
- }
-
- if (!indexOfMeasurementsNotInTemplate.isEmpty()) {
// there are measurements need to be created as normal timeseries
int finalDeviceIndex = deviceIndex;
- List<Integer> finalIndexOfMeasurementsNotInTemplate = indexOfMeasurementsNotInTemplate;
+ List<Integer> finalIndexOfMeasurementsNotInTemplate = indexOfTargetMeasurements;
devicesNeedAutoCreateTimeSeries.compute(
devicePath,
(k, v) -> {
@@ -216,6 +290,71 @@ class AutoCreateSchemaExecutor {
}
return v;
});
+ } else {
+ template = templateInfo.left;
+ indexOfMeasurementsNotInTemplate =
+ checkMeasurementsInSchemaTemplate(
+ devicePath,
+ indexOfTargetMeasurements,
+ measurementsList.get(deviceIndex),
+ isAlignedList.get(deviceIndex),
+ template);
+ if (schemaTree.getMatchedDevices(devicePath).isEmpty()) {
+ // not activated yet
+ devicesNeedActivateTemplate.putIfAbsent(devicePath, templateInfo);
+ }
+
+ if (!indexOfMeasurementsNotInTemplate.isEmpty()) {
+ List<Integer> finalIndexOfMeasurementsNotInTemplate1 = indexOfMeasurementsNotInTemplate;
+ int finalDeviceIndex1 = deviceIndex;
+ templateExtendInfoMap.compute(
+ template.getName(),
+ (k, v) -> {
+ TemplateExtendInfo templateExtendInfo;
+ if (v == null) {
+ templateExtendInfo = new TemplateExtendInfo(k);
+ } else {
+ templateExtendInfo = v;
+ }
+
+ String measurement;
+ TSDataType dataType;
+ TSEncoding encoding;
+ CompressionType compressionType;
+ for (int index : finalIndexOfMeasurementsNotInTemplate1) {
+ measurement = measurementsList.get(finalDeviceIndex1)[index];
+ dataType = tsDataTypesList.get(finalDeviceIndex1)[index];
+ if (encodingsList != null && encodingsList.get(finalDeviceIndex1) != null) {
+ encoding = encodingsList.get(finalDeviceIndex1)[index];
+ } else {
+ encoding = getDefaultEncoding(dataType);
+ }
+ if (compressionTypesList != null
+ && compressionTypesList.get(finalDeviceIndex1) != null) {
+ compressionType = compressionTypesList.get(finalDeviceIndex1)[index];
+ } else {
+ compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
+ }
+ templateExtendInfo.addMeasurement(
+ measurement, dataType, encoding, compressionType);
+ }
+ return templateExtendInfo;
+ });
+ }
+ }
+ }
+
+ if (!templateExtendInfoMap.isEmpty()) {
+ for (TemplateExtendInfo templateExtendInfo : templateExtendInfoMap.values()) {
+ internalExtendTemplate(
+ templateExtendInfo.getTemplateName(),
+ templateExtendInfo.getMeasurements(),
+ templateExtendInfo.getDataTypes(),
+ templateExtendInfo.getEncodings(),
+ templateExtendInfo.getCompressors());
+ }
+ for (Pair<Template, PartialPath> value : devicesNeedActivateTemplate.values()) {
+ value.left = templateManager.getTemplate(value.left.getId());
}
}
@@ -224,7 +363,8 @@ class AutoCreateSchemaExecutor {
for (Map.Entry<PartialPath, Pair<Template, PartialPath>> entry :
devicesNeedActivateTemplate.entrySet()) {
devicePath = entry.getKey();
- template = entry.getValue().left;
+ // take the latest template
+ template = templateManager.getTemplate(entry.getValue().left.getId());
for (Map.Entry<String, IMeasurementSchema> measurementEntry :
template.getSchemaMap().entrySet()) {
schemaTree.appendSingleMeasurement(
@@ -415,4 +555,26 @@ class AutoCreateSchemaExecutor {
}
}
}
+
+ private void internalExtendTemplate(
+ String templateName,
+ List<String> measurementList,
+ List<TSDataType> dataTypeList,
+ List<TSEncoding> encodingList,
+ List<CompressionType> compressionTypeList) {
+
+ ExecutionResult executionResult =
+ statementExecutor.apply(
+ new AlterSchemaTemplateStatement(
+ templateName,
+ measurementList,
+ dataTypeList,
+ encodingList,
+ compressionTypeList,
+ TemplateAlterOperationType.EXTEND_TEMPLATE));
+ TSStatus status = executionResult.status;
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new SemanticException(new IoTDBException(status.getMessage(), status.getCode()));
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 21983ddc95..18a860f2d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
@@ -40,7 +39,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
@@ -94,6 +92,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
config.getQueryTimeoutThreshold()),
this::cacheUpdater);
+ private final NormalSchemaFetcher normalSchemaFetcher =
+ new NormalSchemaFetcher(schemaCache, autoCreateSchemaExecutor, clusterSchemaFetchExecutor);
+ private final TemplateSchemaFetcher templateSchemaFetcher =
+ new TemplateSchemaFetcher(
+ templateManager,
+ templateSchemaCache,
+ autoCreateSchemaExecutor,
+ clusterSchemaFetchExecutor);
+
private static final class ClusterSchemaFetcherHolder {
private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -133,7 +140,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
ClusterSchemaTree cachedSchema;
Set<String> storageGroupSet = new HashSet<>();
for (PartialPath fullPath : fullPathList) {
- cachedSchema = schemaCache.get(fullPath);
+ cachedSchema = templateSchemaCache.get(fullPath);
+ if (cachedSchema.isEmpty()) {
+ cachedSchema = schemaCache.get(fullPath);
+ }
if (cachedSchema.isEmpty()) {
isAllCached = false;
break;
@@ -160,59 +170,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, true);
}
- /**
- * CONFORM indicates that the provided devicePath had been cached as a template activated path,
- * ensuring that the alignment of the device, as well as the name and schema of every measurement
- * are consistent with the cache.
- *
- * @param computation
- * @param devicePath derives from computation
- * @param measurements derives from computation
- * @return true if conform to template cache, which means no need to fetch or create anymore
- */
- private boolean conformsToTemplateCache(
- ISchemaComputationWithAutoCreation computation,
- PartialPath devicePath,
- String[] measurements) {
- if (templateSchemaCache.get(devicePath) == null) {
- return false;
- }
-
- computation.computeDevice(
- templateManager.getTemplate(templateSchemaCache.get(devicePath)).isDirectAligned());
- Map<String, IMeasurementSchema> templateSchema =
- templateManager.getTemplate(templateSchemaCache.get(devicePath)).getSchemaMap();
- for (int i = 0; i < measurements.length; i++) {
- if (!templateSchema.containsKey(measurements[i])) {
- return false;
- }
- IMeasurementSchema schema = templateSchema.get(measurements[i]);
- computation.computeMeasurement(
- i,
- new IMeasurementSchemaInfo() {
- @Override
- public String getName() {
- return schema.getMeasurementId();
- }
-
- @Override
- public MeasurementSchema getSchema() {
- return new MeasurementSchema(
- schema.getMeasurementId(),
- schema.getType(),
- schema.getTimeTSEncoding(),
- schema.getCompressor());
- }
-
- @Override
- public String getAlias() {
- return null;
- }
- });
- }
- return true;
- }
-
/**
* Store the fetched schema in either the schemaCache or templateSchemaCache, depending on its
* associated device.
@@ -235,7 +192,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
templateInfo = Optional.ofNullable(templateManager.checkTemplateSetInfo(devicePath));
if (templateInfo.isPresent()) {
- templateSchemaCache.put(devicePath, templateInfo.get().left.getId());
+ templateSchemaCache.put(
+ devicePath, tree.getBelongedDatabase(devicePath), templateInfo.get().left.getId());
templateDevices.add(devicePath);
} else {
schemaCache.putSingleMeasurementPath(tree.getBelongedDatabase(path), path);
@@ -252,52 +210,25 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
schemaCache.takeReadLock();
templateSchemaCache.takeReadLock();
try {
- PartialPath devicePath = schemaComputationWithAutoCreation.getDevicePath();
- String[] measurements = schemaComputationWithAutoCreation.getMeasurements();
-
- if (conformsToTemplateCache(schemaComputationWithAutoCreation, devicePath, measurements)) {
- return;
- }
-
- List<Integer> indexOfMissingMeasurements =
- schemaCache.compute(schemaComputationWithAutoCreation);
- // all schema can be taken from cache
- if (indexOfMissingMeasurements.isEmpty()) {
- return;
+ Pair<Template, PartialPath> templateSetInfo =
+ templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
+ List<Integer> indexOfMissingMeasurements;
+ if (templateSetInfo == null) {
+ // normal timeseries
+ indexOfMissingMeasurements =
+ normalSchemaFetcher.processNormalTimeSeries(schemaComputationWithAutoCreation);
+ } else {
+ // template timeseries
+ indexOfMissingMeasurements =
+ templateSchemaFetcher.processTemplateTimeSeries(
+ templateSetInfo, schemaComputationWithAutoCreation);
}
- // try fetch the missing schema from remote and cache fetched schema
- ClusterSchemaTree remoteSchemaTree =
- clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
- devicePath, measurements, indexOfMissingMeasurements);
- // check and compute the fetched schema
- indexOfMissingMeasurements =
- remoteSchemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
-
// all schema has been taken and processed
if (indexOfMissingMeasurements.isEmpty()) {
return;
}
- // auto create and process the missing schema
- if (config.isAutoCreateSchemaEnabled()) {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- autoCreateSchemaExecutor.autoCreateMissingMeasurements(
- schemaTree,
- devicePath,
- indexOfMissingMeasurements,
- measurements,
- schemaComputationWithAutoCreation::getDataType,
- schemaComputationWithAutoCreation.isAligned());
- indexOfMissingMeasurements =
- schemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
-
- // all schema has been taken and processed
- if (indexOfMissingMeasurements.isEmpty()) {
- return;
- }
- }
-
// offer null for the rest missing schema processing
for (int index : indexOfMissingMeasurements) {
schemaComputationWithAutoCreation.computeMeasurement(index, null);
@@ -317,135 +248,28 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
templateSchemaCache.takeReadLock();
try {
- // filter all computation, remove those inside template
- List<ISchemaComputationWithAutoCreation>
- schemaComputationWithAutoCreationListOutisdeTemplate = new ArrayList<>();
-
+ List<ISchemaComputationWithAutoCreation> normalTimeSeriesRequestList = new ArrayList<>();
+ List<ISchemaComputationWithAutoCreation> templateTimeSeriesRequestList = new ArrayList<>();
+ List<Pair<Template, PartialPath>> templateSetInfoList = new ArrayList<>();
+ Pair<Template, PartialPath> templateSetInfo;
for (ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation :
schemaComputationWithAutoCreationList) {
- if (!conformsToTemplateCache(
- schemaComputationWithAutoCreation,
- schemaComputationWithAutoCreation.getDevicePath(),
- schemaComputationWithAutoCreation.getMeasurements())) {
- schemaComputationWithAutoCreationListOutisdeTemplate.add(
- schemaComputationWithAutoCreation);
+ templateSetInfo =
+ templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
+ if (templateSetInfo == null) {
+ normalTimeSeriesRequestList.add(schemaComputationWithAutoCreation);
+ } else {
+ templateTimeSeriesRequestList.add(schemaComputationWithAutoCreation);
+ templateSetInfoList.add(templateSetInfo);
}
}
- List<List<Integer>> indexOfMissingMeasurementsList =
- new ArrayList<>(schemaComputationWithAutoCreationListOutisdeTemplate.size());
- List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
- ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
- List<Integer> indexOfMissingMeasurements;
- for (int i = 0, size = schemaComputationWithAutoCreationListOutisdeTemplate.size();
- i < size;
- i++) {
- schemaComputationWithAutoCreation =
- schemaComputationWithAutoCreationListOutisdeTemplate.get(i);
- indexOfMissingMeasurements = schemaCache.compute(schemaComputationWithAutoCreation);
- if (!indexOfMissingMeasurements.isEmpty()) {
- indexOfDevicesWithMissingMeasurements.add(i);
- indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
- }
- }
-
- // all schema can be taken from cache
- if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
- return;
- }
-
- // try fetch the missing schema from remote
- ClusterSchemaTree remoteSchemaTree =
- clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
- schemaComputationWithAutoCreationListOutisdeTemplate.stream()
- .map(ISchemaComputationWithAutoCreation::getDevicePath)
- .collect(Collectors.toList()),
- schemaComputationWithAutoCreationListOutisdeTemplate.stream()
- .map(ISchemaComputationWithAutoCreation::getMeasurements)
- .collect(Collectors.toList()),
- indexOfDevicesWithMissingMeasurements,
- indexOfMissingMeasurementsList);
- // check and compute the fetched schema
- List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
- List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
- for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
- schemaComputationWithAutoCreation =
- schemaComputationWithAutoCreationListOutisdeTemplate.get(
- indexOfDevicesWithMissingMeasurements.get(i));
- indexOfMissingMeasurements =
- remoteSchemaTree.compute(
- schemaComputationWithAutoCreation, indexOfMissingMeasurementsList.get(i));
- if (!indexOfMissingMeasurements.isEmpty()) {
- indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
- indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
- }
- }
-
- // all schema has been taken and processed
- if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
- return;
- }
-
- // auto create and process the missing schema
- if (config.isAutoCreateSchemaEnabled()) {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- autoCreateSchemaExecutor.autoCreateMissingMeasurements(
- schemaTree,
- schemaComputationWithAutoCreationListOutisdeTemplate.stream()
- .map(ISchemaComputationWithAutoCreation::getDevicePath)
- .collect(Collectors.toList()),
- indexOfDevicesNeedAutoCreateSchema,
- indexOfMeasurementsNeedAutoCreate,
- schemaComputationWithAutoCreationListOutisdeTemplate.stream()
- .map(ISchemaComputationWithAutoCreation::getMeasurements)
- .collect(Collectors.toList()),
- schemaComputationWithAutoCreationListOutisdeTemplate.stream()
- .map(
- o -> {
- TSDataType[] dataTypes = new TSDataType[o.getMeasurements().length];
- for (int i = 0, length = dataTypes.length; i < length; i++) {
- dataTypes[i] = o.getDataType(i);
- }
- return dataTypes;
- })
- .collect(Collectors.toList()),
- null,
- null,
- schemaComputationWithAutoCreationListOutisdeTemplate.stream()
- .map(ISchemaComputationWithAutoCreation::isAligned)
- .collect(Collectors.toList()));
- indexOfDevicesWithMissingMeasurements = new ArrayList<>();
- indexOfMissingMeasurementsList = new ArrayList<>();
- for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
- schemaComputationWithAutoCreation =
- schemaComputationWithAutoCreationListOutisdeTemplate.get(
- indexOfDevicesNeedAutoCreateSchema.get(i));
- indexOfMissingMeasurements =
- schemaTree.compute(
- schemaComputationWithAutoCreation, indexOfMeasurementsNeedAutoCreate.get(i));
- if (!indexOfMissingMeasurements.isEmpty()) {
- indexOfDevicesWithMissingMeasurements.add(indexOfDevicesNeedAutoCreateSchema.get(i));
- indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
- }
- }
-
- // all schema has been taken and processed
- if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
- return;
- }
- } else {
- indexOfDevicesWithMissingMeasurements = indexOfDevicesNeedAutoCreateSchema;
- indexOfMissingMeasurementsList = indexOfMeasurementsNeedAutoCreate;
+ if (!normalTimeSeriesRequestList.isEmpty()) {
+ normalSchemaFetcher.processNormalTimeSeries(normalTimeSeriesRequestList);
}
-
- // offer null for the rest missing schema processing
- for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
- schemaComputationWithAutoCreation =
- schemaComputationWithAutoCreationListOutisdeTemplate.get(
- indexOfDevicesWithMissingMeasurements.get(i));
- for (int index : indexOfMissingMeasurementsList.get(i)) {
- schemaComputationWithAutoCreation.computeMeasurement(index, null);
- }
+ if (!templateTimeSeriesRequestList.isEmpty()) {
+ templateSchemaFetcher.processTemplateTimeSeries(
+ templateSetInfoList, templateTimeSeriesRequestList);
}
} finally {
schemaCache.releaseReadLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
new file mode 100644
index 0000000000..e162e88e60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+class NormalSchemaFetcher {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private final DataNodeSchemaCache schemaCache;
+
+ private final AutoCreateSchemaExecutor autoCreateSchemaExecutor;
+ private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor;
+
+ NormalSchemaFetcher(
+ DataNodeSchemaCache schemaCache,
+ AutoCreateSchemaExecutor autoCreateSchemaExecutor,
+ ClusterSchemaFetchExecutor clusterSchemaFetchExecutor) {
+ this.schemaCache = schemaCache;
+ this.autoCreateSchemaExecutor = autoCreateSchemaExecutor;
+ this.clusterSchemaFetchExecutor = clusterSchemaFetchExecutor;
+ }
+
+ List<Integer> processNormalTimeSeries(
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+ List<Integer> indexOfMissingMeasurements =
+ schemaCache.compute(schemaComputationWithAutoCreation);
+ // all schema can be taken from cache
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return indexOfMissingMeasurements;
+ }
+
+ // try fetch the missing schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+ schemaComputationWithAutoCreation.getDevicePath(),
+ schemaComputationWithAutoCreation.getMeasurements(),
+ indexOfMissingMeasurements);
+ // check and compute the fetched schema
+ indexOfMissingMeasurements =
+ remoteSchemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
+
+ // all schema has been taken and processed
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return indexOfMissingMeasurements;
+ }
+
+ // auto create and process the missing schema
+ if (config.isAutoCreateSchemaEnabled()) {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ autoCreateSchemaExecutor.autoCreateTimeSeries(
+ schemaTree,
+ schemaComputationWithAutoCreation.getDevicePath(),
+ indexOfMissingMeasurements,
+ schemaComputationWithAutoCreation.getMeasurements(),
+ schemaComputationWithAutoCreation::getDataType,
+ schemaComputationWithAutoCreation.isAligned());
+ indexOfMissingMeasurements =
+ schemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
+ }
+
+ return indexOfMissingMeasurements;
+ }
+
+ void processNormalTimeSeries(
+ List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {
+ List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
+ List<List<Integer>> indexOfMissingMeasurementsList =
+ new ArrayList<>(schemaComputationWithAutoCreationList.size());
+
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
+ List<Integer> indexOfMissingMeasurements;
+ for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
+ schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
+ indexOfMissingMeasurements = schemaCache.compute(schemaComputationWithAutoCreation);
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesWithMissingMeasurements.add(i);
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema can be taken from cache
+ if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+ return;
+ }
+
+ // try fetch the missing schema from remote
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getDevicePath)
+ .collect(Collectors.toList()),
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getMeasurements)
+ .collect(Collectors.toList()),
+ indexOfDevicesWithMissingMeasurements,
+ indexOfMissingMeasurementsList);
+ // check and compute the fetched schema
+ List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
+ List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
+ for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
+ schemaComputationWithAutoCreation =
+ schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
+ indexOfMissingMeasurements =
+ remoteSchemaTree.compute(
+ schemaComputationWithAutoCreation, indexOfMissingMeasurementsList.get(i));
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
+ indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema has been taken and processed
+ if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
+ return;
+ }
+
+ // auto create and process the missing schema
+ if (config.isAutoCreateSchemaEnabled()) {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ autoCreateSchemaExecutor.autoCreateTimeSeries(
+ schemaTree,
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getDevicePath)
+ .collect(Collectors.toList()),
+ indexOfDevicesNeedAutoCreateSchema,
+ indexOfMeasurementsNeedAutoCreate,
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getMeasurements)
+ .collect(Collectors.toList()),
+ schemaComputationWithAutoCreationList.stream()
+ .map(
+ o -> {
+ TSDataType[] dataTypes = new TSDataType[o.getMeasurements().length];
+ for (int i = 0, length = dataTypes.length; i < length; i++) {
+ dataTypes[i] = o.getDataType(i);
+ }
+ return dataTypes;
+ })
+ .collect(Collectors.toList()),
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::isAligned)
+ .collect(Collectors.toList()));
+ indexOfDevicesWithMissingMeasurements = new ArrayList<>();
+ indexOfMissingMeasurementsList = new ArrayList<>();
+ for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
+ schemaComputationWithAutoCreation =
+ schemaComputationWithAutoCreationList.get(indexOfDevicesNeedAutoCreateSchema.get(i));
+ indexOfMissingMeasurements =
+ schemaTree.compute(
+ schemaComputationWithAutoCreation, indexOfMeasurementsNeedAutoCreate.get(i));
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesWithMissingMeasurements.add(indexOfDevicesNeedAutoCreateSchema.get(i));
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema has been taken and processed
+ if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+ return;
+ }
+ } else {
+ indexOfDevicesWithMissingMeasurements = indexOfDevicesNeedAutoCreateSchema;
+ indexOfMissingMeasurementsList = indexOfMeasurementsNeedAutoCreate;
+ }
+
+ // offer null for the rest missing schema processing
+ for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
+ schemaComputationWithAutoCreation =
+ schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
+ for (int index : indexOfMissingMeasurementsList.get(i)) {
+ schemaComputationWithAutoCreation.computeMeasurement(index, null);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java
new file mode 100644
index 0000000000..3f94243861
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+class TemplateSchemaFetcher {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final ITemplateManager templateManager;
+ private final DataNodeTemplateSchemaCache templateSchemaCache;
+
+ private final AutoCreateSchemaExecutor autoCreateSchemaExecutor;
+ private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor;
+
+ TemplateSchemaFetcher(
+ ITemplateManager templateManager,
+ DataNodeTemplateSchemaCache templateSchemaCache,
+ AutoCreateSchemaExecutor autoCreateSchemaExecutor,
+ ClusterSchemaFetchExecutor clusterSchemaFetchExecutor) {
+ this.templateManager = templateManager;
+ this.templateSchemaCache = templateSchemaCache;
+ this.autoCreateSchemaExecutor = autoCreateSchemaExecutor;
+ this.clusterSchemaFetchExecutor = clusterSchemaFetchExecutor;
+ }
+
+ List<Integer> processTemplateTimeSeries(
+ Pair<Template, PartialPath> templateSetInfo,
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+ PartialPath devicePath = schemaComputationWithAutoCreation.getDevicePath();
+ String[] measurements = schemaComputationWithAutoCreation.getMeasurements();
+ Template template = templateSetInfo.getLeft();
+ List<String> extensionMeasurementList = new ArrayList<>();
+ List<TSDataType> extensionDataTypeList = new ArrayList<>();
+ for (int i = 0; i < measurements.length; i++) {
+ if (!template.hasSchema(measurements[i])) {
+ extensionMeasurementList.add(measurements[i]);
+ extensionDataTypeList.add(schemaComputationWithAutoCreation.getDataType(i));
+ }
+ }
+
+ if (!extensionMeasurementList.isEmpty() && config.isAutoCreateSchemaEnabled()) {
+ autoCreateSchemaExecutor.autoExtendTemplate(
+ template.getName(), extensionMeasurementList, extensionDataTypeList);
+ }
+
+ List<Integer> indexOfMissingMeasurements =
+ templateSchemaCache.conformsToTemplateCache(schemaComputationWithAutoCreation);
+ // all schema can be taken from cache
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return indexOfMissingMeasurements;
+ }
+
+ if (indexOfMissingMeasurements.size() < measurements.length) {
+ // activated but missing measurement in template
+ return indexOfMissingMeasurements;
+ }
+
+ // not activated or not cached
+ // try fetch the missing schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+ schemaComputationWithAutoCreation.getDevicePath(),
+ schemaComputationWithAutoCreation.getMeasurements(),
+ indexOfMissingMeasurements);
+ // check and compute the fetched schema
+ indexOfMissingMeasurements =
+ remoteSchemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
+
+ // all schema has been taken and processed
+ if (indexOfMissingMeasurements.isEmpty()) {
+ // already activated
+ return indexOfMissingMeasurements;
+ }
+
+ // not activated
+ // auto create and process the missing schema
+ if (config.isAutoCreateSchemaEnabled()) {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ autoCreateSchemaExecutor.autoActivateTemplate(schemaTree, devicePath, template.getId());
+ indexOfMissingMeasurements =
+ schemaTree.compute(schemaComputationWithAutoCreation, indexOfMissingMeasurements);
+ }
+
+ return indexOfMissingMeasurements;
+ }
+
+ void processTemplateTimeSeries(
+ List<Pair<Template, PartialPath>> templateSetInfoList,
+ List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {
+
+ List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
+ List<List<Integer>> indexOfMissingMeasurementsList =
+ new ArrayList<>(schemaComputationWithAutoCreationList.size());
+
+ Map<String, TemplateExtendInfo> extensionMeasurementMap = new HashMap<>();
+
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
+ List<Integer> indexOfMissingMeasurements;
+ String[] measurements;
+ Template template;
+ for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
+ schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
+ template = templateSetInfoList.get(i).left;
+ measurements = schemaComputationWithAutoCreation.getMeasurements();
+ for (int j = 0; j < measurements.length; j++) {
+ if (!template.hasSchema(measurements[j])) {
+ extensionMeasurementMap
+ .computeIfAbsent(template.getName(), TemplateExtendInfo::new)
+ .addMeasurement(
+ measurements[j],
+ schemaComputationWithAutoCreation.getDataType(j),
+ getDefaultEncoding(schemaComputationWithAutoCreation.getDataType(j)),
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
+ }
+ }
+ }
+ if (!extensionMeasurementMap.isEmpty() && config.isAutoCreateSchemaEnabled()) {
+ autoCreateSchemaExecutor.autoExtendTemplate(extensionMeasurementMap);
+ }
+
+ for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
+ schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
+ indexOfMissingMeasurements =
+ templateSchemaCache.conformsToTemplateCache(schemaComputationWithAutoCreation);
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesWithMissingMeasurements.add(i);
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema can be taken from cache
+ if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+ return;
+ }
+
+ // try fetch the missing schema from remote
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getDevicePath)
+ .collect(Collectors.toList()),
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getMeasurements)
+ .collect(Collectors.toList()),
+ indexOfDevicesWithMissingMeasurements,
+ indexOfMissingMeasurementsList);
+ // check and compute the fetched schema
+ List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
+ List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
+ for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
+ schemaComputationWithAutoCreation =
+ schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
+ indexOfMissingMeasurements =
+ remoteSchemaTree.compute(
+ schemaComputationWithAutoCreation, indexOfMissingMeasurementsList.get(i));
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
+ indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema has been taken and processed
+ if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
+ return;
+ }
+
+ // auto create and process the missing schema
+ if (config.isAutoCreateSchemaEnabled()) {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ autoCreateSchemaExecutor.autoActivateTemplate(
+ schemaTree,
+ indexOfDevicesNeedAutoCreateSchema.stream()
+ .map(index -> schemaComputationWithAutoCreationList.get(index).getDevicePath())
+ .collect(Collectors.toList()),
+ indexOfDevicesNeedAutoCreateSchema.stream()
+ .map(templateSetInfoList::get)
+ .collect(Collectors.toList()));
+ indexOfDevicesWithMissingMeasurements = new ArrayList<>();
+ indexOfMissingMeasurementsList = new ArrayList<>();
+ for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
+ schemaComputationWithAutoCreation =
+ schemaComputationWithAutoCreationList.get(indexOfDevicesNeedAutoCreateSchema.get(i));
+ indexOfMissingMeasurements =
+ schemaTree.compute(
+ schemaComputationWithAutoCreation, indexOfMeasurementsNeedAutoCreate.get(i));
+ if (!indexOfMissingMeasurements.isEmpty()) {
+ indexOfDevicesWithMissingMeasurements.add(indexOfDevicesNeedAutoCreateSchema.get(i));
+ indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
+ }
+ }
+
+ // all schema has been taken and processed
+ if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+ return;
+ }
+ } else {
+ indexOfDevicesWithMissingMeasurements = indexOfDevicesNeedAutoCreateSchema;
+ indexOfMissingMeasurementsList = indexOfMeasurementsNeedAutoCreate;
+ }
+
+ // offer null for the rest missing schema processing
+ for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
+ schemaComputationWithAutoCreation =
+ schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
+ for (int index : indexOfMissingMeasurementsList.get(i)) {
+ schemaComputationWithAutoCreation.computeMeasurement(index, null);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index a3c56b19e8..18e706c47a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.CreateModelT
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.DropModelTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.ShowModelsTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.model.ShowTrailsTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.AlterSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.CreateSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.DeactivateSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.DropSchemaTemplateTask;
@@ -116,6 +117,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.model.CreateModelStatemen
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.DropModelStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.ShowModelsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.ShowTrailsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DeactivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
@@ -360,6 +362,12 @@ public class ConfigTaskVisitor
return new DropSchemaTemplateTask(dropSchemaTemplateStatement);
}
+ @Override
+ public IConfigTask visitAlterSchemaTemplate(
+ AlterSchemaTemplateStatement alterSchemaTemplateStatement, TaskContext context) {
+ return new AlterSchemaTemplateTask(alterSchemaTemplateStatement, context.getQueryId());
+ }
+
@Override
public IConfigTask visitShowDataNodes(
ShowDataNodesStatement showDataNodesStatement, TaskContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2dfbdef2c8..577b5b0f86 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -102,6 +103,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.alter.TemplateAlterOperationUtil;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
@@ -150,6 +152,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDatabaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.CreateModelStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DeactivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
@@ -1371,6 +1374,49 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> alterSchemaTemplate(
+ String queryId, AlterSchemaTemplateStatement alterSchemaTemplateStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TAlterSchemaTemplateReq req = new TAlterSchemaTemplateReq();
+ req.setQueryId(queryId);
+ req.setTemplateAlterInfo(
+ TemplateAlterOperationUtil.generateExtendTemplateReqInfo(
+ alterSchemaTemplateStatement.getOperationType(),
+ alterSchemaTemplateStatement.getTemplateAlterInfo()));
+ try (ConfigNodeClient client =
+ CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TSStatus tsStatus;
+ do {
+ try {
+ tsStatus = client.alterSchemaTemplate(req);
+ } catch (TTransportException e) {
+ if (e.getType() == TTransportException.TIMED_OUT
+ || e.getCause() instanceof SocketTimeoutException) {
+ // Time out mainly caused by slow execution, just wait
+ tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
+ } else {
+ throw e;
+ }
+ }
+ // Keep waiting until task ends
+ } while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.warn(
+ "Failed to alter schema template {} in config node, status is {}.",
+ alterSchemaTemplateStatement.getTemplateAlterInfo().getTemplateName(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus.getMessage(), tsStatus.getCode()));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
private ByteBuffer serializePatternListToByteBuffer(List<PartialPath> patternList) {
PathPatternTree patternTree = new PathPatternTree();
for (PartialPath pathPattern : patternList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 559a4db82d..21f7c11ae9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDatabaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.CreateModelStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DeactivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
@@ -147,6 +148,9 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> dropSchemaTemplate(
DropSchemaTemplateStatement dropSchemaTemplateStatement);
+ SettableFuture<ConfigTaskResult> alterSchemaTemplate(
+ String queryId, AlterSchemaTemplateStatement alterSchemaTemplateStatement);
+
SettableFuture<ConfigTaskResult> createPipeSink(CreatePipeSinkStatement createPipeSinkStatement);
SettableFuture<ConfigTaskResult> dropPipeSink(DropPipeSinkStatement dropPipeSinkStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/AlterSchemaTemplateTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/AlterSchemaTemplateTask.java
new file mode 100644
index 0000000000..a2e9241f7b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/AlterSchemaTemplateTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata.template;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class AlterSchemaTemplateTask implements IConfigTask {
+
+ private final AlterSchemaTemplateStatement statement;
+
+ private final String queryId;
+
+ public AlterSchemaTemplateTask(AlterSchemaTemplateStatement statement, String queryId) {
+ this.statement = statement;
+ this.queryId = queryId;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.alterSchemaTemplate(queryId, statement);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 3a731ebfb0..056c4c7893 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.SqlConstant;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.TemplateAlterOperationType;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
@@ -148,6 +149,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.model.DropModelStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.ShowModelsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.ShowTrailsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DeactivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
@@ -3015,6 +3017,31 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
ctx.ALIGNED() != null);
}
+ @Override
+ public Statement visitAlterSchemaTemplate(IoTDBSqlParser.AlterSchemaTemplateContext ctx) {
+ String name = parseIdentifier(ctx.templateName.getText());
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressors = new ArrayList<>();
+
+ for (IoTDBSqlParser.TemplateMeasurementClauseContext templateClauseContext :
+ ctx.templateMeasurementClause()) {
+ measurements.add(
+ parseNodeNameWithoutWildCard(templateClauseContext.nodeNameWithoutWildcard()));
+ parseAttributeClause(
+ templateClauseContext.attributeClauses(), dataTypes, encodings, compressors);
+ }
+
+ return new AlterSchemaTemplateStatement(
+ name,
+ measurements,
+ dataTypes,
+ encodings,
+ compressors,
+ TemplateAlterOperationType.EXTEND_TEMPLATE);
+ }
+
void parseAttributeClause(
IoTDBSqlParser.AttributeClausesContext ctx,
List<TSDataType> dataTypes,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
index 324ac78bf5..4bf8190495 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
@@ -98,6 +98,7 @@ public enum StatementType {
CREATE_TEMPLATE,
SET_TEMPLATE,
ACTIVATE_TEMPLATE,
+ ALTER_TEMPLATE,
MERGE,
FULL_MERGE,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 2e2ac4ff11..5e4673ed7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.model.DropModelStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.ShowModelsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.model.ShowTrailsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.AlterSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.BatchActivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DeactivateTemplateStatement;
@@ -434,6 +435,11 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(showPathsUsingTemplateStatement, context);
}
+ public R visitAlterSchemaTemplate(
+ AlterSchemaTemplateStatement alterSchemaTemplateStatement, C context) {
+ return visitStatement(alterSchemaTemplateStatement, context);
+ }
+
public R visitShowPipeSink(ShowPipeSinkStatement showPipeSinkStatement, C context) {
return visitStatement(showPipeSinkStatement, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/template/AlterSchemaTemplateStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/template/AlterSchemaTemplateStatement.java
new file mode 100644
index 0000000000..9a501ee621
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/template/AlterSchemaTemplateStatement.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata.template;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.TemplateAlterOperationType;
+import org.apache.iotdb.db.metadata.template.alter.TemplateAlterInfo;
+import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.List;
+
+public class AlterSchemaTemplateStatement extends Statement implements IConfigStatement {
+
+ private TemplateAlterInfo templateAlterInfo;
+
+ private TemplateAlterOperationType operationType;
+
+ public AlterSchemaTemplateStatement() {
+ super();
+ statementType = StatementType.ALTER_TEMPLATE;
+ }
+
+ public AlterSchemaTemplateStatement(
+ String templateName,
+ List<String> measurements,
+ List<TSDataType> dataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
+ TemplateAlterOperationType operationType) {
+ this();
+ if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
+ this.templateAlterInfo =
+ new TemplateExtendInfo(templateName, measurements, dataTypes, encodings, compressors);
+ }
+ this.operationType = operationType;
+ }
+
+ public TemplateAlterInfo getTemplateAlterInfo() {
+ return templateAlterInfo;
+ }
+
+ public TemplateAlterOperationType getOperationType() {
+ return operationType;
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return null;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitAlterSchemaTemplate(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index c63fff0c93..f29dddcac5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -414,9 +414,16 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
- DataNodeSchemaCache.getInstance().invalidateAll();
- DataNodeTemplateSchemaCache.getInstance().invalidateCache();
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ DataNodeSchemaCache.getInstance().takeWriteLock();
+ DataNodeTemplateSchemaCache.getInstance().takeWriteLock();
+ try {
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ DataNodeTemplateSchemaCache.getInstance().invalidateCache();
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } finally {
+ DataNodeSchemaCache.getInstance().releaseWriteLock();
+ DataNodeTemplateSchemaCache.getInstance().releaseWriteLock();
+ }
}
@Override
@@ -1242,6 +1249,9 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
case COMMIT_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().commitTemplatePreSetInfo(req.getTemplateInfo());
break;
+ case UPDATE_TEMPLATE_INFO:
+ ClusterTemplateManager.getInstance().updateTemplateInfo(req.getTemplateInfo());
+ break;
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index dbc5ac6fe2..0a2d0fb57a 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -559,6 +559,11 @@ struct TCreateSchemaTemplateReq {
2: required binary serializedTemplate
}
+struct TAlterSchemaTemplateReq {
+ 1: required string queryId
+ 2: required binary templateAlterInfo
+}
+
struct TGetAllTemplatesResp {
1: required common.TSStatus status
2: optional list<binary> templateList
@@ -1232,6 +1237,8 @@ service IConfigNodeRPCService {
*/
common.TSStatus dropSchemaTemplate(string req)
+ common.TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req)
+
/**
* Generate a set of DeleteTimeSeriesProcedure to delete some specific TimeSeries
*