You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/20 02:09:28 UTC
[iotdb] branch master updated: [IOTDB-4837] Refactor Set Schema Template (#9637)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b02502fa14 [IOTDB-4837] Refactor Set Schema Template (#9637)
b02502fa14 is described below
commit b02502fa143957941d4abc221dde0a2b6c977ad5
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu Apr 20 10:09:17 2023 +0800
[IOTDB-4837] Refactor Set Schema Template (#9637)
---
.../confignode/client/DataNodeRequestType.java | 1 +
.../client/async/AsyncDataNodeClientPool.java | 8 +
.../client/async/handlers/AsyncClientHandler.java | 10 +
.../rpc/CheckTimeSeriesExistenceRPCHandler.java | 87 ++++
.../consensus/request/ConfigPhysicalPlan.java | 8 +
.../consensus/request/ConfigPhysicalPlanType.java | 2 +
.../template/CommitSetSchemaTemplatePlan.java | 80 +++
.../write/template/PreSetSchemaTemplatePlan.java | 80 +++
.../confignode/manager/ClusterSchemaManager.java | 84 +---
.../iotdb/confignode/manager/ConfigManager.java | 6 +-
.../iotdb/confignode/manager/ProcedureManager.java | 44 ++
.../persistence/executor/ConfigPlanExecutor.java | 7 +
.../persistence/schema/ClusterSchemaInfo.java | 96 +++-
.../persistence/schema/TemplatePreSetTable.java | 195 ++++++++
.../persistence/schema/TemplateTable.java | 1 -
.../impl/schema/SetTemplateProcedure.java | 534 +++++++++++++++++++++
.../procedure/state/schema/SetTemplateState.java | 28 +-
.../procedure/store/ProcedureFactory.java | 6 +
.../confignode/procedure/store/ProcedureType.java | 1 +
.../{ => schema}/ClusterSchemaInfoTest.java | 3 +-
.../schema/TemplatePreSetTableTest.java | 114 +++++
.../{ => schema}/TemplateTableTest.java | 3 +-
.../db/it/schema/IoTDBDeactivateTemplateIT.java | 3 +-
.../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java | 95 +++-
.../org/apache/iotdb/commons/path/PartialPath.java | 4 +-
.../apache/iotdb/commons/path/PartialPathTest.java | 2 +-
.../schemaregion/rocksdb/RSchemaRegion.java | 2 +-
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 2 +-
.../template/TemplateImcompatibeException.java | 14 +-
.../iotdb/db/metadata/mtree/ConfigMTree.java | 4 +
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 8 -
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 8 -
.../db/metadata/schemaregion/ISchemaRegion.java | 2 +-
.../db/metadata/schemaregion/SchemaEngine.java | 8 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 2 +-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 2 +-
.../metadata/template/ClusterTemplateManager.java | 242 +++++++++-
.../db/metadata/template/ITemplateManager.java | 7 +-
.../template/TemplateInternalRPCUpdateType.java | 17 +-
.../metadata/template/TemplateInternalRPCUtil.java | 40 +-
.../operator/schema/SchemaQueryScanOperator.java | 2 +-
.../schema/source/SchemaSourceFactory.java | 5 +
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 55 +--
.../plan/analyze/schema/ClusterSchemaFetcher.java | 10 +-
.../db/mpp/plan/analyze/schema/ISchemaFetcher.java | 5 +-
.../plan/execution/config/ConfigTaskVisitor.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 12 +-
.../config/executor/IConfigTaskExecutor.java | 2 +-
.../metadata/template/SetSchemaTemplateTask.java | 7 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 107 ++++-
.../schemaRegion/SchemaRegionManagementTest.java | 4 +-
.../schema/SchemaQueryScanOperatorTest.java | 4 +-
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 8 +-
.../iotdb/db/mpp/plan/plan/distribution/Util.java | 8 +-
.../src/main/thrift/confignode.thrift | 5 +-
thrift/src/main/thrift/datanode.thrift | 12 +
56 files changed, 1841 insertions(+), 267 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 25f18a59a9..56ec55d53f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -93,6 +93,7 @@ public enum DataNodeRequestType {
ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
DEACTIVATE_TEMPLATE,
COUNT_PATHS_USING_TEMPLATE,
+ CHECK_TIMESERIES_EXISTENCE,
/** @TODO Need to migrate to 'Node Maintenance' */
KILL_QUERY_INSTANCE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index c641cec211..ace4d0268b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -33,10 +33,12 @@ import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteSchemaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
@@ -343,6 +345,12 @@ public class AsyncDataNodeClientPool {
(CountPathsUsingTemplateRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case CHECK_TIMESERIES_EXISTENCE:
+ client.checkTimeSeriesExistence(
+ (TCheckTimeSeriesExistenceReq) clientHandler.getRequest(requestId),
+ (CheckTimeSeriesExistenceRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
case KILL_QUERY_INSTANCE:
client.killQueryInstance(
(String) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index b807b52c5a..f54c0e3977 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -23,9 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.handlers.rpc.AbstractAsyncRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteSchemaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
@@ -183,6 +185,14 @@ public class AsyncClientHandler<Q, R> {
dataNodeLocationMap,
(Map<Integer, TCountPathsUsingTemplateResp>) responseMap,
countDownLatch);
+ case CHECK_TIMESERIES_EXISTENCE:
+ return new CheckTimeSeriesExistenceRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TCheckTimeSeriesExistenceResp>) responseMap,
+ countDownLatch);
case SET_TTL:
case CREATE_DATA_REGION:
case CREATE_SCHEMA_REGION:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
new file mode 100644
index 0000000000..88b4ee56f7
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class CheckTimeSeriesExistenceRPCHandler
+ extends AbstractAsyncRPCHandler<TCheckTimeSeriesExistenceResp> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(CountPathsUsingTemplateRPCHandler.class);
+
+ public CheckTimeSeriesExistenceRPCHandler(
+ DataNodeRequestType requestType,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ Map<Integer, TCheckTimeSeriesExistenceResp> responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
+ }
+
+ @Override
+ public void onComplete(TCheckTimeSeriesExistenceResp response) {
+ TSStatus tsStatus = response.getStatus();
+ responseMap.put(requestId, response);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.info("Successfully check timeseries existence on DataNode: {}", targetDataNode);
+ } else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.error(
+ "Failed to check timeseries existence on DataNode {}, {}", targetDataNode, tsStatus);
+ } else {
+ LOGGER.error(
+ "Failed to check timeseries existence on DataNode {}, {}", targetDataNode, tsStatus);
+ }
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ String errorMsg =
+ "Check timeseries existence error on DataNode: {id="
+ + targetDataNode.getDataNodeId()
+ + ", internalEndPoint="
+ + targetDataNode.getInternalEndPoint()
+ + "}"
+ + e.getMessage();
+ LOGGER.error(errorMsg);
+
+ countDownLatch.countDown();
+ TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
+ resp.setStatus(
+ new TSStatus(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
+ responseMap.put(requestId, resp);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 87e2a32258..ce1eda69d1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -96,8 +96,10 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePla
import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
@@ -318,6 +320,12 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case SetSchemaTemplate:
plan = new SetSchemaTemplatePlan();
break;
+ case PreSetSchemaTemplate:
+ plan = new PreSetSchemaTemplatePlan();
+ break;
+ case CommitSetSchemaTemplate:
+ plan = new CommitSetSchemaTemplatePlan();
+ break;
case GetTemplateSetInfo:
plan = new GetTemplateSetInfoPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 32efc881a7..e3408eb7e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -113,6 +113,8 @@ public enum ConfigPhysicalPlanType {
RollbackUnsetTemplate((short) 809),
UnsetTemplate((short) 810),
DropSchemaTemplate((short) 811),
+ PreSetSchemaTemplate((short) 812),
+ CommitSetSchemaTemplate((short) 813),
/** Deprecated types for sync, restored them for upgrade */
@Deprecated
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/CommitSetSchemaTemplatePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/CommitSetSchemaTemplatePlan.java
new file mode 100644
index 0000000000..37ad3faefc
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/CommitSetSchemaTemplatePlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.template;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CommitSetSchemaTemplatePlan extends ConfigPhysicalPlan {
+
+ private String name;
+ private String path;
+
+ private boolean isRollback = false;
+
+ public CommitSetSchemaTemplatePlan() {
+ super(ConfigPhysicalPlanType.CommitSetSchemaTemplate);
+ }
+
+ public CommitSetSchemaTemplatePlan(String name, String path) {
+ super(ConfigPhysicalPlanType.CommitSetSchemaTemplate);
+ this.name = name;
+ this.path = path;
+ }
+
+ public CommitSetSchemaTemplatePlan(String name, String path, boolean isRollback) {
+ super(ConfigPhysicalPlanType.CommitSetSchemaTemplate);
+ this.name = name;
+ this.path = path;
+ this.isRollback = isRollback;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public boolean isRollback() {
+ return isRollback;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ ReadWriteIOUtils.write(name, stream);
+ ReadWriteIOUtils.write(path, stream);
+ ReadWriteIOUtils.write(isRollback, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.name = ReadWriteIOUtils.readString(buffer);
+ this.path = ReadWriteIOUtils.readString(buffer);
+ this.isRollback = ReadWriteIOUtils.readBool(buffer);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/PreSetSchemaTemplatePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/PreSetSchemaTemplatePlan.java
new file mode 100644
index 0000000000..4573ed524c
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/template/PreSetSchemaTemplatePlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.template;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PreSetSchemaTemplatePlan extends ConfigPhysicalPlan {
+
+ private String name;
+ private String path;
+
+ private boolean isRollback = false;
+
+ public PreSetSchemaTemplatePlan() {
+ super(ConfigPhysicalPlanType.PreSetSchemaTemplate);
+ }
+
+ public PreSetSchemaTemplatePlan(String name, String path) {
+ super(ConfigPhysicalPlanType.PreSetSchemaTemplate);
+ this.name = name;
+ this.path = path;
+ }
+
+ public PreSetSchemaTemplatePlan(String name, String path, boolean isRollback) {
+ super(ConfigPhysicalPlanType.PreSetSchemaTemplate);
+ this.name = name;
+ this.path = path;
+ this.isRollback = isRollback;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public boolean isRollback() {
+ return isRollback;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ ReadWriteIOUtils.write(name, stream);
+ ReadWriteIOUtils.write(path, stream);
+ ReadWriteIOUtils.write(isRollback, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.name = ReadWriteIOUtils.readString(buffer);
+ this.path = ReadWriteIOUtils.readString(buffer);
+ this.isRollback = ReadWriteIOUtils.readBool(buffer);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 1ac6823635..b73cceb573 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@ -30,12 +29,10 @@ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
-import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
@@ -52,7 +49,6 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
-import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
@@ -74,9 +70,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
-import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUtil;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -617,80 +610,6 @@ public class ClusterSchemaManager {
return resp;
}
- /** mount template */
- public synchronized TSStatus setSchemaTemplate(String templateName, String path) {
- // check whether the template can be set on given path
- CheckTemplateSettablePlan checkTemplateSettablePlan =
- new CheckTemplateSettablePlan(templateName, path);
- TemplateInfoResp resp =
- (TemplateInfoResp) getConsensusManager().read(checkTemplateSettablePlan).getDataset();
- if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return resp.getStatus();
- }
-
- Template template = resp.getTemplateList().get(0);
-
- // prepare req
- TUpdateTemplateReq req = new TUpdateTemplateReq();
- req.setType(TemplateInternalRPCUpdateType.ADD_TEMPLATE_SET_INFO.toByte());
- req.setTemplateInfo(TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(template, path));
-
- // sync template set info to all dataNodes
- TSStatus status;
- List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes();
- for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
- status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- dataNodeInfo.getLocation().getInternalEndPoint(),
- req,
- DataNodeRequestType.UPDATE_TEMPLATE);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // roll back the synced cache on dataNodes
- return status.setSubStatus(rollbackTemplateSetInfoSync(template.getId(), path));
- }
- }
-
- // execute set operation on configNode
- SetSchemaTemplatePlan setSchemaTemplatePlan = new SetSchemaTemplatePlan(templateName, path);
- status = getConsensusManager().write(setSchemaTemplatePlan).getStatus();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return status;
- } else {
- // roll back the synced cache on dataNodes
- return status.setSubStatus(rollbackTemplateSetInfoSync(template.getId(), path));
- }
- }
-
- private List<TSStatus> rollbackTemplateSetInfoSync(int templateId, String path) {
- // construct the rollbackReq
- TUpdateTemplateReq rollbackReq = new TUpdateTemplateReq();
- rollbackReq.setType(TemplateInternalRPCUpdateType.INVALIDATE_TEMPLATE_SET_INFO.toByte());
- rollbackReq.setTemplateInfo(
- TemplateInternalRPCUtil.generateInvalidateTemplateSetInfoBytes(templateId, path));
-
- // get all dataNodes
- List<TDataNodeConfiguration> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes();
-
- // send rollbackReq
- TSStatus status;
- List<TSStatus> failedRollbackStatusList = new ArrayList<>();
- for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
- status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- dataNodeInfo.getLocation().getInternalEndPoint(),
- rollbackReq,
- DataNodeRequestType.UPDATE_TEMPLATE);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failedRollbackStatusList.add(status);
- }
- }
- return failedRollbackStatusList;
- }
-
/** show path set template xx */
public TGetPathsSetTemplatesResp getPathsSetTemplate(String templateName) {
GetPathsSetTemplatePlan getPathsSetTemplatePlan = new GetPathsSetTemplatePlan(templateName);
@@ -707,7 +626,8 @@ public class ClusterSchemaManager {
}
/**
- * get all template set info to sync to all dataNodes, the pre unset template info won't be taken
+ * get all template set and pre-set info to sync to a registering dataNodes, the pre unset
+ * template info won't be taken
*/
public byte[] getAllTemplateSetInfo() {
AllTemplateSetInfoResp resp =
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index c6ac13164d..52a3ccb279 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1442,10 +1442,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
+ public synchronized TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return clusterSchemaManager.setSchemaTemplate(req.getName(), req.getPath());
+ return procedureManager.setSchemaTemplate(req.getQueryId(), req.getName(), req.getPath());
} else {
return status;
}
@@ -1512,7 +1512,7 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
+ public synchronized TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 93d73b507b..8318cd639f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
@@ -215,6 +216,49 @@ public class ProcedureManager {
}
}
+ public TSStatus setSchemaTemplate(String queryId, String templateName, String templateSetPath) {
+ long procedureId = -1;
+ synchronized (this) {
+ boolean hasOverlappedTask = false;
+ ProcedureType type;
+ SetTemplateProcedure setTemplateProcedure;
+ for (Procedure<?> procedure : executor.getProcedures().values()) {
+ type = ProcedureFactory.getProcedureType(procedure);
+ if (type == null || !type.equals(ProcedureType.SET_TEMPLATE_PROCEDURE)) {
+ continue;
+ }
+ setTemplateProcedure = (SetTemplateProcedure) procedure;
+ if (queryId.equals(setTemplateProcedure.getQueryId())) {
+ procedureId = setTemplateProcedure.getProcId();
+ break;
+ }
+ if (templateSetPath.equals(setTemplateProcedure.getTemplateSetPath())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ }
+
+ if (procedureId == -1) {
+ if (hasOverlappedTask) {
+ return RpcUtils.getStatus(
+ TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
+ "Some other task is setting template on target path.");
+ }
+ procedureId =
+ this.executor.submitProcedure(
+ new SetTemplateProcedure(queryId, templateName, templateSetPath));
+ }
+ }
+ List<TSStatus> procedureStatus = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
+ if (isSucceed) {
+ return StatusUtils.OK;
+ } else {
+ return procedureStatus.get(0);
+ }
+ }
+
public TSStatus deactivateTemplate(
String queryId, Map<PartialPath, List<Template>> templateSetInfo) {
long procedureId = -1;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 1721eef36e..bbbe6a879a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -86,8 +86,10 @@ import org.apache.iotdb.confignode.consensus.request.write.quota.SetThrottleQuot
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
@@ -370,6 +372,11 @@ public class ConfigPlanExecutor {
return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) physicalPlan);
case SetSchemaTemplate:
return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) physicalPlan);
+ case PreSetSchemaTemplate:
+ return clusterSchemaInfo.preSetSchemaTemplate((PreSetSchemaTemplatePlan) physicalPlan);
+ case CommitSetSchemaTemplate:
+ return clusterSchemaInfo.commitSetSchemaTemplate(
+ (CommitSetSchemaTemplatePlan) physicalPlan);
case PreUnsetTemplate:
return clusterSchemaInfo.preUnsetSchemaTemplate((PreUnsetSchemaTemplatePlan) physicalPlan);
case RollbackUnsetTemplate:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
index 6f05cff326..ea28545b8b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
@@ -40,8 +40,10 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetDataRepli
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
@@ -101,12 +103,15 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
private final TemplateTable templateTable;
+ private final TemplatePreSetTable templatePreSetTable;
+
public ClusterSchemaInfo() throws IOException {
databaseReadWriteLock = new ReentrantReadWriteLock();
try {
mTree = new ConfigMTree();
templateTable = new TemplateTable();
+ templatePreSetTable = new TemplatePreSetTable();
} catch (MetadataException e) {
LOGGER.error("Can't construct ClusterSchemaInfo", e);
throw new IOException(e);
@@ -551,7 +556,9 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException {
- return processMTreeTakeSnapshot(snapshotDir) && templateTable.processTakeSnapshot(snapshotDir);
+ return processMTreeTakeSnapshot(snapshotDir)
+ && templateTable.processTakeSnapshot(snapshotDir)
+ && templatePreSetTable.processTakeSnapshot(snapshotDir);
}
public boolean processMTreeTakeSnapshot(File snapshotDir) throws IOException {
@@ -592,6 +599,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
public void processLoadSnapshot(File snapshotDir) throws IOException {
processMTreeLoadSnapshot(snapshotDir);
templateTable.processLoadSnapshot(snapshotDir);
+ templatePreSetTable.processLoadSnapshot(snapshotDir);
}
public void processMTreeLoadSnapshot(File snapshotDir) throws IOException {
@@ -719,13 +727,84 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
try {
int templateId = templateTable.getTemplate(setSchemaTemplatePlan.getName()).getId();
- mTree.getNodeWithAutoCreate(path).setSchemaTemplateId(templateId);
+ mTree.setTemplate(templateId, path);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (MetadataException e) {
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+ }
+
+ public synchronized TSStatus preSetSchemaTemplate(
+ PreSetSchemaTemplatePlan preSetSchemaTemplatePlan) {
+ PartialPath path;
+ try {
+ path = new PartialPath(preSetSchemaTemplatePlan.getPath());
+ } catch (IllegalPathException e) {
+ LOGGER.error(e.getMessage());
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+
+ try {
+ int templateId = templateTable.getTemplate(preSetSchemaTemplatePlan.getName()).getId();
+ if (preSetSchemaTemplatePlan.isRollback()) {
+ rollbackPreSetSchemaTemplate(templateId, path);
+ } else {
+ preSetSchemaTemplate(templateId, path);
+ }
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (MetadataException e) {
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
+ private void preSetSchemaTemplate(int templateId, PartialPath templateSetPath)
+ throws MetadataException {
+ templatePreSetTable.preSetTemplate(templateId, templateSetPath);
+ mTree.setTemplate(templateId, templateSetPath);
+ }
+
+ private void rollbackPreSetSchemaTemplate(int templateId, PartialPath templateSetPath)
+ throws MetadataException {
+ try {
+ mTree.unsetTemplate(templateId, templateSetPath);
+ } catch (MetadataException ignore) {
+ // node not exists or not set template
+ }
+ templatePreSetTable.removeSetTemplate(templateId, templateSetPath);
+ }
+
+ public synchronized TSStatus commitSetSchemaTemplate(
+ CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan) {
+ PartialPath path;
+ try {
+ path = new PartialPath(commitSetSchemaTemplatePlan.getPath());
+ } catch (IllegalPathException e) {
+ LOGGER.error(e.getMessage());
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+
+ try {
+ int templateId = templateTable.getTemplate(commitSetSchemaTemplatePlan.getName()).getId();
+ if (commitSetSchemaTemplatePlan.isRollback()) {
+ rollbackCommitSetSchemaTemplate(templateId, path);
+ } else {
+ commitSetSchemaTemplate(templateId, path);
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (MetadataException e) {
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+ }
+
+ private void commitSetSchemaTemplate(int templateId, PartialPath templateSetPath) {
+ templatePreSetTable.removeSetTemplate(templateId, templateSetPath);
+ }
+
+ private void rollbackCommitSetSchemaTemplate(int templateId, PartialPath templateSetPath)
+ throws MetadataException {
+ mTree.unsetTemplate(templateId, templateSetPath);
+ }
+
public PathInfoResp getPathsSetTemplate(GetPathsSetTemplatePlan getPathsSetTemplatePlan) {
PathInfoResp pathInfoResp = new PathInfoResp();
TSStatus status;
@@ -748,21 +827,26 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
public AllTemplateSetInfoResp getAllTemplateSetInfo() {
List<Template> templateList = templateTable.getAllTemplate();
- Map<Integer, List<String>> templateSetInfo = new HashMap<>();
+ Map<Integer, List<Pair<String, Boolean>>> templateSetInfo = new HashMap<>();
int id;
for (Template template : templateList) {
id = template.getId();
try {
List<String> pathList = mTree.getPathsSetOnTemplate(id, true);
if (!pathList.isEmpty()) {
- templateSetInfo.put(id, pathList);
+ List<Pair<String, Boolean>> pathSetInfoList = new ArrayList<>();
+ for (String path : pathList) {
+ pathSetInfoList.add(
+ new Pair<>(path, templatePreSetTable.isPreSet(id, new PartialPath(path))));
+ }
+ templateSetInfo.put(id, pathSetInfoList);
}
} catch (MetadataException e) {
LOGGER.error("Error occurred when get paths set on template {}", id, e);
}
}
- Map<Template, List<String>> templateSetInfoMap = new HashMap<>();
+ Map<Template, List<Pair<String, Boolean>>> templateSetInfoMap = new HashMap<>();
for (Template template : templateList) {
if (templateSetInfo.containsKey(template.getId())) {
templateSetInfoMap.put(template, templateSetInfo.get(template.getId()));
@@ -770,7 +854,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
return new AllTemplateSetInfoResp(
- TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(templateSetInfoMap));
+ TemplateInternalRPCUtil.generateAddAllTemplateSetInfoBytes(templateSetInfoMap));
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplatePreSetTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplatePreSetTable.java
new file mode 100644
index 0000000000..2c0a3754ef
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplatePreSetTable.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.schema;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TemplatePreSetTable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TemplatePreSetTable.class);
+
+ private static final String SNAPSHOT_FILENAME = "template_preset_info.bin";
+
+ private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ private final Map<Integer, Set<PartialPath>> templatePreSetMap = new ConcurrentHashMap<>();
+
+ public TemplatePreSetTable() {}
+
+ public boolean isPreSet(int templateId, PartialPath templateSetPath) {
+ readWriteLock.readLock().lock();
+ try {
+ Set<PartialPath> templatePreSetPaths = templatePreSetMap.get(templateId);
+ if (templatePreSetPaths == null) {
+ return false;
+ }
+ return templatePreSetPaths.contains(templateSetPath);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ public void preSetTemplate(int templateId, PartialPath templateSetPath) {
+ readWriteLock.writeLock().lock();
+ try {
+ templatePreSetMap
+ .computeIfAbsent(templateId, k -> Collections.synchronizedSet(new HashSet<>()))
+ .add(templateSetPath);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public boolean removeSetTemplate(int templateId, PartialPath templateSetPath) {
+ readWriteLock.writeLock().lock();
+ try {
+ Set<PartialPath> set = templatePreSetMap.get(templateId);
+ if (set == null) {
+ return false;
+ }
+ if (set.remove(templateSetPath)) {
+ if (set.isEmpty()) {
+ templatePreSetMap.remove(templateId);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ readWriteLock.writeLock().lock();
+ try {
+ if (templatePreSetMap.isEmpty()) {
+ return true;
+ }
+
+ File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot of TemplatePreSetTable, because snapshot file [{}] is already exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+ BufferedOutputStream outputStream = new BufferedOutputStream(fileOutputStream)) {
+ serialize(outputStream);
+ outputStream.flush();
+ fileOutputStream.flush();
+ outputStream.close();
+ fileOutputStream.close();
+ return tmpFile.renameTo(snapshotFile);
+ } finally {
+ for (int retry = 0; retry < 5; retry++) {
+ if (!tmpFile.exists() || tmpFile.delete()) {
+ break;
+ } else {
+ LOGGER.warn(
+ "Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
+ }
+ }
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public void processLoadSnapshot(File snapshotDir) throws IOException {
+ readWriteLock.writeLock().lock();
+ try {
+ File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
+ if (!snapshotFile.exists()) {
+ return;
+ }
+
+ if (!snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot of TemplatePreSetTable,snapshot file [{}] is not a valid file.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
+ BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
+ // Load snapshot of template preset table
+ templatePreSetMap.clear();
+ deserialize(bufferedInputStream);
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(templatePreSetMap.size(), outputStream);
+ for (Map.Entry<Integer, Set<PartialPath>> entry : templatePreSetMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().size(), outputStream);
+ for (PartialPath preSetPath : entry.getValue()) {
+ ReadWriteIOUtils.write(preSetPath.getFullPath(), outputStream);
+ }
+ }
+ }
+
+ private void deserialize(InputStream inputStream) throws IOException {
+ int templateNum = ReadWriteIOUtils.readInt(inputStream);
+ while (templateNum > 0) {
+ templateNum--;
+ int templateId = ReadWriteIOUtils.readInt(inputStream);
+ int preSetPathNum = ReadWriteIOUtils.readInt(inputStream);
+ Set<PartialPath> set = Collections.synchronizedSet(new HashSet<>());
+ while (preSetPathNum > 0) {
+ preSetPathNum--;
+ try {
+ set.add(new PartialPath(ReadWriteIOUtils.readString(inputStream)));
+ } catch (IllegalPathException e) {
+ // won't happen
+ }
+ }
+ templatePreSetMap.put(templateId, set);
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
index 9c30a19165..5113f41a31 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
@@ -50,7 +50,6 @@ public class TemplateTable {
private static final Logger LOGGER = LoggerFactory.getLogger(TemplateTable.class);
- // StorageGroup read write lock
private final ReentrantReadWriteLock templateReadWriteLock;
private final AtomicInteger templateIdGenerator;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
new file mode 100644
index 0000000000..6d60041675
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan;
+import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.state.schema.SetTemplateState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
+import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
+import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUtil;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+
+public class SetTemplateProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, SetTemplateState> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SetTemplateProcedure.class);
+
+ private String queryId;
+ private String templateName;
+ private String templateSetPath;
+
+ public SetTemplateProcedure() {
+ super();
+ }
+
+ public SetTemplateProcedure(String queryId, String templateName, String templateSetPath) {
+ super();
+ this.queryId = queryId;
+ this.templateName = templateName;
+ this.templateSetPath = templateSetPath;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTemplateState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ long startTime = System.currentTimeMillis();
+ try {
+ switch (state) {
+ case VALIDATE_TEMPLATE_EXISTENCE:
+ LOGGER.info(
+ "Check template existence set on path {} when try setting template {}",
+ templateSetPath,
+ templateName);
+ validateTemplateExistence(env);
+ break;
+ case PRE_SET:
+ LOGGER.info("Pre set schema template {} on path {}", templateName, templateSetPath);
+ preSetTemplate(env);
+ break;
+ case PRE_RELEASE:
+ LOGGER.info(
+ "Pre release schema template {} set on path {}", templateName, templateSetPath);
+ preReleaseTemplate(env);
+ break;
+ case VALIDATE_TIMESERIES_EXISTENCE:
+ LOGGER.info(
+ "Check timeseries existence under path {} when try setting template {}",
+ templateSetPath,
+ templateName);
+ validateTimeSeriesExistence(env);
+ break;
+ case COMMIT_SET:
+ LOGGER.info("Commit set schema template {} on path {}", templateName, templateSetPath);
+ commitSetTemplate(env);
+ setNextState(SetTemplateState.COMMIT_RELEASE);
+ break;
+ case COMMIT_RELEASE:
+ LOGGER.info(
+ "Commit release schema template {} set on path {}", templateName, templateSetPath);
+ commitReleaseTemplate(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ setFailure(new ProcedureException("Unrecognized SetTemplateState " + state.toString()));
+ return Flow.NO_MORE_STATE;
+ }
+ return Flow.HAS_MORE_STATE;
+ } finally {
+ LOGGER.info(
+ String.format(
+ "SetSchemaTemplate-[%s] costs %sms",
+ state.toString(), (System.currentTimeMillis() - startTime)));
+ }
+ }
+
+ private void validateTemplateExistence(ConfigNodeProcedureEnv env) {
+ // check whether the template can be set on given path
+ CheckTemplateSettablePlan checkTemplateSettablePlan =
+ new CheckTemplateSettablePlan(templateName, templateSetPath);
+ TemplateInfoResp resp =
+ (TemplateInfoResp)
+ env.getConfigManager()
+ .getConsensusManager()
+ .read(checkTemplateSettablePlan)
+ .getDataset();
+ if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(SetTemplateState.PRE_SET);
+ } else {
+ setFailure(
+ new ProcedureException(
+ new IoTDBException(resp.getStatus().getMessage(), resp.getStatus().getCode())));
+ }
+ }
+
+ private void preSetTemplate(ConfigNodeProcedureEnv env) {
+ PreSetSchemaTemplatePlan preSetSchemaTemplatePlan =
+ new PreSetSchemaTemplatePlan(templateName, templateSetPath);
+ TSStatus status =
+ env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan).getStatus();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(SetTemplateState.PRE_RELEASE);
+ } else {
+ LOGGER.warn(
+ "Failed to pre set template {} on path {} due to {}",
+ templateName,
+ templateSetPath,
+ status.getMessage());
+ setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode())));
+ }
+ }
+
+ private void preReleaseTemplate(ConfigNodeProcedureEnv env) {
+ Template template = getTemplate(env);
+ if (template == null) {
+ // already setFailure
+ return;
+ }
+
+ TUpdateTemplateReq req = new TUpdateTemplateReq();
+ req.setType(TemplateInternalRPCUpdateType.ADD_TEMPLATE_PRE_SET_INFO.toByte());
+ req.setTemplateInfo(
+ TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(template, templateSetPath));
+
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+ AsyncClientHandler<TUpdateTemplateReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.UPDATE_TEMPLATE, req, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (Map.Entry<Integer, TSStatus> entry : statusMap.entrySet()) {
+ if (entry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to sync template {} pre-set info on path {} to DataNode {}",
+ templateName,
+ templateSetPath,
+ dataNodeLocationMap.get(entry.getKey()));
+ setFailure(new ProcedureException(new MetadataException("Pre set template failed")));
+ return;
+ }
+ }
+ setNextState(SetTemplateState.VALIDATE_TIMESERIES_EXISTENCE);
+ }
+
+ private Template getTemplate(ConfigNodeProcedureEnv env) {
+ GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(templateName);
+ TemplateInfoResp templateResp =
+ (TemplateInfoResp)
+ env.getConfigManager().getConsensusManager().read(getSchemaTemplatePlan).getDataset();
+ if (templateResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setFailure(
+ new ProcedureException(
+ new IoTDBException(
+ templateResp.getStatus().getMessage(), templateResp.getStatus().getCode())));
+ return null;
+ }
+ if (templateResp.getTemplateList() == null || templateResp.getTemplateList().isEmpty()) {
+ setFailure(new ProcedureException(new UndefinedTemplateException(templateName)));
+ return null;
+ }
+ return templateResp.getTemplateList().get(0);
+ }
+
+ private void validateTimeSeriesExistence(ConfigNodeProcedureEnv env) {
+ PathPatternTree patternTree = new PathPatternTree();
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ PartialPath path = null;
+ try {
+ path = new PartialPath(templateSetPath);
+ patternTree.appendPathPattern(path);
+ patternTree.appendPathPattern(path.concatNode(MULTI_LEVEL_PATH_WILDCARD));
+ patternTree.serialize(dataOutputStream);
+ } catch (IllegalPathException | IOException ignored) {
+ }
+ ByteBuffer patternTreeBytes = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+
+ Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup =
+ env.getConfigManager().getRelatedSchemaRegionGroup(patternTree);
+
+ List<TCheckTimeSeriesExistenceResp> respList = new ArrayList<>();
+ DataNodeRegionTaskExecutor<TCheckTimeSeriesExistenceReq, TCheckTimeSeriesExistenceResp>
+ regionTask =
+ new DataNodeRegionTaskExecutor<
+ TCheckTimeSeriesExistenceReq, TCheckTimeSeriesExistenceResp>(
+ env,
+ relatedSchemaRegionGroup,
+ false,
+ DataNodeRequestType.CHECK_TIMESERIES_EXISTENCE,
+ ((dataNodeLocation, consensusGroupIdList) ->
+ new TCheckTimeSeriesExistenceReq(patternTreeBytes, consensusGroupIdList))) {
+
+ @Override
+ protected List<TConsensusGroupId> processResponseOfOneDataNode(
+ TDataNodeLocation dataNodeLocation,
+ List<TConsensusGroupId> consensusGroupIdList,
+ TCheckTimeSeriesExistenceResp response) {
+ respList.add(response);
+ List<TConsensusGroupId> failedRegionList = new ArrayList<>();
+ if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return failedRegionList;
+ }
+
+ if (response.getStatus().getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ List<TSStatus> subStatus = response.getStatus().getSubStatus();
+ for (int i = 0; i < subStatus.size(); i++) {
+ if (subStatus.get(i).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && subStatus.get(i).getCode()
+ != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ failedRegionList.add(consensusGroupIdList.get(i));
+ }
+ }
+ } else {
+ failedRegionList.addAll(consensusGroupIdList);
+ }
+ return failedRegionList;
+ }
+
+ @Override
+ protected void onAllReplicasetFailure(
+ TConsensusGroupId consensusGroupId, Set<TDataNodeLocation> dataNodeLocationSet) {
+ setFailure(
+ new ProcedureException(
+ new MetadataException(
+ String.format(
+ "Set template %s to %s failed when [check timeseries existence on DataNode] because all replicaset of schemaRegion %s failed. %s",
+ templateName,
+ templateSetPath,
+ consensusGroupId.id,
+ dataNodeLocationSet))));
+ interruptTask();
+ }
+ };
+ regionTask.execute();
+ if (isFailed()) {
+ return;
+ }
+
+ for (TCheckTimeSeriesExistenceResp resp : respList) {
+ if (resp.isExists()) {
+ setFailure(new ProcedureException(new TemplateImcompatibeException(templateName, path)));
+ }
+ }
+ setNextState(SetTemplateState.COMMIT_SET);
+ }
+
+ private void commitSetTemplate(ConfigNodeProcedureEnv env) {
+ CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan =
+ new CommitSetSchemaTemplatePlan(templateName, templateSetPath);
+ TSStatus status =
+ env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan).getStatus();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(SetTemplateState.COMMIT_RELEASE);
+ } else {
+ LOGGER.warn(
+ "Failed to commit set template {} on path {} due to {}",
+ templateName,
+ templateSetPath,
+ status.getMessage());
+ setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode())));
+ }
+ }
+
+ private void commitReleaseTemplate(ConfigNodeProcedureEnv env) {
+ Template template = getTemplate(env);
+ if (template == null) {
+ // already setFailure
+ return;
+ }
+
+ TUpdateTemplateReq req = new TUpdateTemplateReq();
+ req.setType(TemplateInternalRPCUpdateType.COMMIT_TEMPLATE_SET_INFO.toByte());
+ req.setTemplateInfo(
+ TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(template, templateSetPath));
+
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+ AsyncClientHandler<TUpdateTemplateReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.UPDATE_TEMPLATE, req, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (Map.Entry<Integer, TSStatus> entry : statusMap.entrySet()) {
+ if (entry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to sync template {} commit-set info on path {} to DataNode {}",
+ templateName,
+ templateSetPath,
+ dataNodeLocationMap.get(entry.getKey()));
+ setFailure(
+ new ProcedureException(
+ new MetadataException(
+ String.format(
+ "Failed to set schema template %s on path %s because there's failure on DataNode %s",
+ templateName, templateSetPath, dataNodeLocationMap.get(entry.getKey())))));
+ return;
+ }
+ }
+ }
+
+ private void submitTemplateMaintainTask(TDataNodeLocation dataNodeLocation) {
+ // todo implement async retry
+
+ }
+
+ @Override
+ protected boolean isRollbackSupported(SetTemplateState setTemplateState) {
+ return true;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, SetTemplateState state)
+ throws IOException, InterruptedException, ProcedureException {
+ long startTime = System.currentTimeMillis();
+ try {
+ switch (state) {
+ case PRE_SET:
+ LOGGER.info(
+ "Start rollback pre set schema template {} on path {}",
+ templateName,
+ templateSetPath);
+ rollbackPreSet(env);
+ break;
+ case PRE_RELEASE:
+ LOGGER.info(
+ "Start rollback pre release schema template {} on path {}",
+ templateName,
+ templateSetPath);
+ rollbackPreRelease(env);
+ break;
+ case COMMIT_SET:
+ LOGGER.info(
+ "Start rollback commit set schema template {} on path {}",
+ templateName,
+ templateSetPath);
+ rollbackCommitSet(env);
+ break;
+ }
+ } finally {
+ LOGGER.info(
+ "Rollback SetTemplate-{} costs {}ms.", state, (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void rollbackPreSet(ConfigNodeProcedureEnv env) {
+ PreSetSchemaTemplatePlan preSetSchemaTemplatePlan =
+ new PreSetSchemaTemplatePlan(templateName, templateSetPath, true);
+ TSStatus status =
+ env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan).getStatus();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to rollback pre set template {} on path {} due to {}",
+ templateName,
+ templateSetPath,
+ status.getMessage());
+ setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode())));
+ }
+ }
+
+ private void rollbackPreRelease(ConfigNodeProcedureEnv env) {
+ Template template = getTemplate(env);
+ if (template == null) {
+ // already setFailure
+ return;
+ }
+
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+
+ TUpdateTemplateReq invalidateTemplateSetInfoReq = new TUpdateTemplateReq();
+ invalidateTemplateSetInfoReq.setType(
+ TemplateInternalRPCUpdateType.INVALIDATE_TEMPLATE_SET_INFO.toByte());
+ invalidateTemplateSetInfoReq.setTemplateInfo(
+ TemplateInternalRPCUtil.generateInvalidateTemplateSetInfoBytes(
+ template.getId(), templateSetPath));
+
+ AsyncClientHandler<TUpdateTemplateReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.UPDATE_TEMPLATE, invalidateTemplateSetInfoReq, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (Map.Entry<Integer, TSStatus> entry : statusMap.entrySet()) {
+ // all dataNodes must clear the related template cache
+ if (entry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.error(
+ "Failed to rollback pre release template info of template {} set on path {} on DataNode {}",
+ template.getName(),
+ templateSetPath,
+ dataNodeLocationMap.get(entry.getKey()));
+ setFailure(
+ new ProcedureException(new MetadataException("Rollback pre release template failed")));
+ }
+ }
+ }
+
+ private void rollbackCommitSet(ConfigNodeProcedureEnv env) {
+ CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan =
+ new CommitSetSchemaTemplatePlan(templateName, templateSetPath, true);
+ TSStatus status =
+ env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan).getStatus();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to rollback commit set template {} on path {} due to {}",
+ templateName,
+ templateSetPath,
+ status.getMessage());
+ setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode())));
+ }
+ }
+
+ @Override
+ protected SetTemplateState getState(int stateId) {
+ return SetTemplateState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(SetTemplateState state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected SetTemplateState getInitialState() {
+ return SetTemplateState.VALIDATE_TEMPLATE_EXISTENCE;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getTemplateName() {
+ return templateName;
+ }
+
+ public String getTemplateSetPath() {
+ return templateSetPath;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.SET_TEMPLATE_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(queryId, stream);
+ ReadWriteIOUtils.write(templateName, stream);
+ ReadWriteIOUtils.write(templateSetPath, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ queryId = ReadWriteIOUtils.readString(byteBuffer);
+ templateName = ReadWriteIOUtils.readString(byteBuffer);
+ templateSetPath = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SetTemplateProcedure that = (SetTemplateProcedure) o;
+ return Objects.equals(templateName, that.templateName)
+ && Objects.equals(templateSetPath, that.templateSetPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(templateName, templateSetPath);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTemplateState.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTemplateState.java
index 9e3a8fb1fd..cc43259a55 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTemplateState.java
@@ -15,27 +15,15 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.iotdb.db.exception.metadata.template;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class TemplateImcompatibeException extends MetadataException {
-
- public TemplateImcompatibeException(String path, String templateName) {
- super(
- String.format("Path [%s] already exists in [%s]", path, templateName),
- TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
- this.isUserException = true;
- }
+package org.apache.iotdb.confignode.procedure.state.schema;
- public TemplateImcompatibeException(String path, String templateName, String overlapNodeName) {
- super(
- String.format("Path [%s] overlaps with [%s] on [%s]", path, templateName, overlapNodeName),
- TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
- this.isUserException = true;
- }
+public enum SetTemplateState {
+ VALIDATE_TEMPLATE_EXISTENCE,
+ PRE_SET,
+ PRE_RELEASE,
+ VALIDATE_TIMESERIES_EXISTENCE,
+ COMMIT_SET,
+ COMMIT_RELEASE,
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 7e016c4dc8..efa89ebef0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
+import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
@@ -123,6 +124,9 @@ public class ProcedureFactory implements IProcedureFactory {
new CreateCQProcedure(
ConfigNode.getInstance().getConfigManager().getCQManager().getExecutor());
break;
+ case SET_TEMPLATE_PROCEDURE:
+ procedure = new SetTemplateProcedure();
+ break;
case DEACTIVATE_TEMPLATE_PROCEDURE:
procedure = new DeactivateTemplateProcedure();
break;
@@ -178,6 +182,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.DROP_PIPE_PROCEDURE;
} else if (procedure instanceof CreateCQProcedure) {
return ProcedureType.CREATE_CQ_PROCEDURE;
+ } else if (procedure instanceof SetTemplateProcedure) {
+ return ProcedureType.SET_TEMPLATE_PROCEDURE;
} else if (procedure instanceof DeactivateTemplateProcedure) {
return ProcedureType.DEACTIVATE_TEMPLATE_PROCEDURE;
} else if (procedure instanceof UnsetTemplateProcedure) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 74869f51d3..7bb3bff69f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -56,6 +56,7 @@ public enum ProcedureType {
/** Template */
DEACTIVATE_TEMPLATE_PROCEDURE((short) 700),
UNSET_TEMPLATE_PROCEDURE((short) 701),
+ SET_TEMPLATE_PROCEDURE((short) 702),
/** Ml Model */
CREATE_MODEL_PROCEDURE((short) 800),
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfoTest.java
similarity index 98%
rename from confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfoTest.java
index 9b80ceaf81..43c00b3bd7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfoTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.persistence;
+package org.apache.iotdb.confignode.persistence.schema;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.utils.PathUtils;
@@ -27,7 +27,6 @@ import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTe
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
-import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/TemplatePreSetTableTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/TemplatePreSetTableTest.java
new file mode 100644
index 0000000000..aff9148d9e
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/TemplatePreSetTableTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.schema;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class TemplatePreSetTableTest {
+
+ private TemplatePreSetTable templatePreSetTable;
+ private final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+ @Before
+ public void setup() throws IOException {
+ templatePreSetTable = new TemplatePreSetTable();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ templatePreSetTable = null;
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testPreSetTemplate() throws IllegalPathException {
+ int templateId1 = 5;
+ int templateId2 = 10;
+ PartialPath templateSetPath1 = new PartialPath("root.db.t1");
+ PartialPath templateSetPath2 = new PartialPath("root.db.t2");
+ Assert.assertFalse(templatePreSetTable.isPreSet(templateId1, templateSetPath1));
+ Assert.assertFalse(templatePreSetTable.removeSetTemplate(templateId1, templateSetPath1));
+ Assert.assertFalse(templatePreSetTable.isPreSet(templateId2, templateSetPath1));
+ Assert.assertFalse(templatePreSetTable.removeSetTemplate(templateId2, templateSetPath1));
+
+ templatePreSetTable.preSetTemplate(templateId1, templateSetPath1);
+ templatePreSetTable.preSetTemplate(templateId2, templateSetPath1);
+ templatePreSetTable.preSetTemplate(templateId2, templateSetPath2);
+
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId1, templateSetPath1));
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId2, templateSetPath1));
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId2, templateSetPath2));
+
+ Assert.assertTrue(templatePreSetTable.removeSetTemplate(templateId1, templateSetPath1));
+ Assert.assertTrue(templatePreSetTable.removeSetTemplate(templateId2, templateSetPath1));
+
+ Assert.assertFalse(templatePreSetTable.isPreSet(templateId1, templateSetPath1));
+ Assert.assertFalse(templatePreSetTable.isPreSet(templateId2, templateSetPath1));
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId2, templateSetPath2));
+ }
+
+ @Test
+ public void testSnapshot() throws IllegalPathException {
+ int templateId1 = 5;
+ int templateId2 = 10;
+ PartialPath templateSetPath1 = new PartialPath("root.db.t1");
+ PartialPath templateSetPath2 = new PartialPath("root.db.t2");
+
+ try {
+ templatePreSetTable.processTakeSnapshot(snapshotDir);
+ TemplatePreSetTable newTemplatePreSetTable = new TemplatePreSetTable();
+
+ Assert.assertFalse(newTemplatePreSetTable.isPreSet(templateId1, templateSetPath1));
+ Assert.assertFalse(newTemplatePreSetTable.isPreSet(templateId2, templateSetPath1));
+ Assert.assertFalse(newTemplatePreSetTable.isPreSet(templateId2, templateSetPath2));
+
+ templatePreSetTable.preSetTemplate(templateId1, templateSetPath1);
+ templatePreSetTable.preSetTemplate(templateId2, templateSetPath1);
+ templatePreSetTable.preSetTemplate(templateId2, templateSetPath2);
+
+ templatePreSetTable.processTakeSnapshot(snapshotDir);
+ newTemplatePreSetTable = new TemplatePreSetTable();
+ newTemplatePreSetTable.processLoadSnapshot(snapshotDir);
+
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId1, templateSetPath1));
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId2, templateSetPath1));
+ Assert.assertTrue(templatePreSetTable.isPreSet(templateId2, templateSetPath2));
+ } catch (IOException e) {
+ Assert.fail();
+ }
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TemplateTableTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/TemplateTableTest.java
similarity index 97%
rename from confignode/src/test/java/org/apache/iotdb/confignode/persistence/TemplateTableTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/TemplateTableTest.java
index 7da7b6cc6c..f1a33cb580 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TemplateTableTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/TemplateTableTest.java
@@ -17,11 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.confignode.persistence;
+package org.apache.iotdb.confignode.persistence.schema;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.confignode.persistence.schema.TemplateTable;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeactivateTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeactivateTemplateIT.java
index 5b6d4ebda3..590a349a99 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeactivateTemplateIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeactivateTemplateIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.schema;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.After;
@@ -35,7 +36,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-@Category({ClusterIT.class})
+@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBDeactivateTemplateIT extends AbstractSchemaIT {
public IoTDBDeactivateTemplateIT(SchemaTestMode schemaTestMode) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java
index 72728e07c6..ef4300eb6f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSchemaTemplateIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.schema;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.After;
@@ -38,11 +39,13 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import static org.junit.Assert.fail;
+
/**
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the
* IoTDB server should be defined as integration test.
*/
-@Category({ClusterIT.class})
+@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
public IoTDBSchemaTemplateIT(SchemaTestMode schemaTestMode) {
@@ -72,7 +75,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
try {
statement.execute(
"CREATE SCHEMA TEMPLATE str1 (s1 TEXT encoding=GORILLA compressor=SNAPPY, s2 INT32)");
- Assert.fail();
+ fail();
} catch (SQLException e) {
System.out.println(e.getMessage());
Assert.assertEquals(
@@ -84,7 +87,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
try {
statement.execute(
"CREATE SCHEMA TEMPLATE t1 (s1 INT64 encoding=RLE compressor=SNAPPY, s2 INT32)");
- Assert.fail();
+ fail();
} catch (SQLException e) {
Assert.assertEquals(
TSStatusCode.METADATA_ERROR.getStatusCode() + ": Duplicated template name: t1",
@@ -99,7 +102,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
// test drop template which has been set
try {
statement.execute("DROP SCHEMA TEMPLATE t1");
- Assert.fail();
+ fail();
} catch (SQLException e) {
Assert.assertEquals(
TSStatusCode.METADATA_ERROR.getStatusCode()
@@ -163,7 +166,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
try {
statement.execute("UNSET SCHEMA TEMPLATE t1 FROM root.sg1.d1");
- Assert.fail();
+ fail();
} catch (SQLException e) {
Assert.assertEquals(
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode() + ": Template is in use on root.sg1.d1",
@@ -180,7 +183,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
try {
statement.execute(
"CREATE SCHEMA TEMPLATE t1 (s1 INT64 encoding=RLE compressor=SNAPPY, s2 INT32)");
- Assert.fail();
+ fail();
} catch (SQLException e) {
Assert.assertEquals(
TSStatusCode.METADATA_ERROR.getStatusCode() + ": Duplicated template name: t1",
@@ -244,7 +247,7 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
try {
statement.execute("UNSET SCHEMA TEMPLATE t1 FROM root.sg1.d1");
- Assert.fail();
+ fail();
} catch (SQLException e) {
Assert.assertEquals(
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode() + ": Template is in use on root.sg1.d1",
@@ -574,4 +577,82 @@ public class IoTDBSchemaTemplateIT extends AbstractSchemaIT {
}
}
}
+
+ @Test
+ public void testTemplateSetAndTimeSeriesExistenceCheck() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ // set schema template
+ statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg1.d1");
+ // show paths set schema template
+ String[] expectedResult = new String[] {"root.sg1.d1"};
+ Set<String> expectedResultSet = new HashSet<>(Arrays.asList(expectedResult));
+ try (ResultSet resultSet = statement.executeQuery("SHOW PATHS SET SCHEMA TEMPLATE t1")) {
+ String resultRecord;
+ while (resultSet.next()) {
+ resultRecord = resultSet.getString(1);
+ Assert.assertTrue(expectedResultSet.contains(resultRecord));
+ expectedResultSet.remove(resultRecord);
+ }
+ }
+ Assert.assertEquals(0, expectedResultSet.size());
+
+ try {
+ statement.execute("CREATE TIMESERIES root.sg1.d1.s INT32");
+ fail();
+ } catch (SQLException e) {
+ Assert.assertEquals(
+ "516: Cannot create timeseries [root.sg1.d1.s] since schema template [t1] already set on path [root.sg1.d1].",
+ e.getMessage());
+ }
+
+ // unset schema template
+ statement.execute("UNSET SCHEMA TEMPLATE t1 FROM root.sg1.d1");
+ try (ResultSet resultSet = statement.executeQuery("SHOW PATHS SET SCHEMA TEMPLATE t1")) {
+ Assert.assertFalse(resultSet.next());
+ }
+
+ statement.execute("CREATE TIMESERIES root.sg1.d1.s INT32");
+
+ try {
+ statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg1.d1");
+ } catch (SQLException e) {
+ Assert.assertEquals(
+ "516: Cannot set schema template [t1] to path [root.sg1.d1] since there's timeseries under path [root.sg1.d1].",
+ e.getMessage());
+ }
+
+ statement.execute("DELETE TIMESERIES root.sg1.d1.s");
+
+ statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg1.d1");
+ expectedResult = new String[] {"root.sg1.d1"};
+ expectedResultSet = new HashSet<>(Arrays.asList(expectedResult));
+ try (ResultSet resultSet = statement.executeQuery("SHOW PATHS SET SCHEMA TEMPLATE t1")) {
+ String resultRecord;
+ while (resultSet.next()) {
+ resultRecord = resultSet.getString(1);
+ Assert.assertTrue(expectedResultSet.contains(resultRecord));
+ expectedResultSet.remove(resultRecord);
+ }
+ }
+ Assert.assertEquals(0, expectedResultSet.size());
+
+ statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg1.d2.tmp.m");
+ try {
+ statement.execute("CREATE TIMESERIES root.sg1.d2 INT32");
+ } catch (SQLException e) {
+ Assert.assertEquals(
+ "516: Cannot create timeseries [root.sg1.d2] since schema template [t1] already set on path [root.sg1.d2.tmp.m].",
+ e.getMessage());
+ }
+ try {
+ statement.execute("CREATE TIMESERIES root.sg1.d2.s(tmp) INT32");
+ } catch (SQLException e) {
+ Assert.assertEquals(
+ "516: Cannot create timeseries [root.sg1.d2.s] since schema template [t1] already set on path [root.sg1.d2.tmp.m].",
+ e.getMessage());
+ }
+ statement.execute("CREATE TIMESERIES root.sg1.d2.s INT32");
+ }
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index e3c6690ca7..8eea86c232 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -614,8 +614,8 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
return this.getFullPath().compareTo(partialPath.getFullPath());
}
- public boolean startsWith(String[] otherNodes) {
- for (int i = 0; i < otherNodes.length; i++) {
+ public boolean startsWithOrPrefixOf(String[] otherNodes) {
+ for (int i = 0; i < otherNodes.length && i < nodes.length; i++) {
if (!nodes[i].equals(otherNodes[i])) {
return false;
}
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/path/PartialPathTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/path/PartialPathTest.java
index 1b606dc210..92094ea5c8 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/path/PartialPathTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/path/PartialPathTest.java
@@ -449,7 +449,7 @@ public class PartialPathTest {
PartialPath b = a.concatNode("d1");
Assert.assertEquals("[root, sg1, d1]", Arrays.toString(b.getNodes()));
Assert.assertEquals("root.sg1.d1", b.getFullPath());
- Assert.assertTrue(b.startsWith(arr1));
+ Assert.assertTrue(b.startsWithOrPrefixOf(arr1));
Assert.assertEquals("root", b.getFirstNode());
}
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 0d853b2667..4d5d9a9372 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -209,7 +209,7 @@ public class RSchemaRegion implements ISchemaRegion {
}
@Override
- public String getStorageGroupFullPath() {
+ public String getDatabaseFullPath() {
return storageGroupFullPath;
}
diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 9ac8eb1812..d1f92538bf 100644
--- a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -172,7 +172,7 @@ public class TagSchemaRegion implements ISchemaRegion {
}
@Override
- public String getStorageGroupFullPath() {
+ public String getDatabaseFullPath() {
return storageGroupFullPath;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java
index 9e3a8fb1fd..e4a0673fc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateImcompatibeException.java
@@ -21,20 +21,26 @@
package org.apache.iotdb.db.exception.metadata.template;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.rpc.TSStatusCode;
public class TemplateImcompatibeException extends MetadataException {
- public TemplateImcompatibeException(String path, String templateName) {
+ public TemplateImcompatibeException(
+ String path, String templateName, PartialPath templateSetPath) {
super(
- String.format("Path [%s] already exists in [%s]", path, templateName),
+ String.format(
+ "Cannot create timeseries [%s] since schema template [%s] already set on path [%s].",
+ path, templateName, templateSetPath),
TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
this.isUserException = true;
}
- public TemplateImcompatibeException(String path, String templateName, String overlapNodeName) {
+ public TemplateImcompatibeException(String templateName, PartialPath templateSetPath) {
super(
- String.format("Path [%s] overlaps with [%s] on [%s]", path, templateName, overlapNodeName),
+ String.format(
+ "Cannot set schema template [%s] to path [%s] since there's timeseries under path [%s].",
+ templateName, templateSetPath, templateSetPath),
TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
this.isUserException = true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/ConfigMTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/ConfigMTree.java
index a20b453307..170d5127a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/ConfigMTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/ConfigMTree.java
@@ -574,6 +574,10 @@ public class ConfigMTree {
return result;
}
+ public void setTemplate(int templateId, PartialPath templateSetPath) throws MetadataException {
+ getNodeWithAutoCreate(templateSetPath).setSchemaTemplateId(templateId);
+ }
+
public void preUnsetTemplate(int templateId, PartialPath path) throws MetadataException {
getNodeSetTemplate(templateId, path).preUnsetSchemaTemplate();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 276161bce5..92b106e0db 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.exception.metadata.MeasurementInBlackListException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
-import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.mnode.schemafile.ICachedMNode;
@@ -821,13 +820,6 @@ public class MTreeBelowSGCachedImpl {
cur = child;
}
synchronized (this) {
- for (String measurement : template.getSchemaMap().keySet()) {
- if (store.hasChild(cur, measurement)) {
- throw new TemplateImcompatibeException(
- activatePath.concatNode(measurement).getFullPath(), template.getName());
- }
- }
-
if (cur.isDevice()) {
entityMNode = cur.getAsDeviceMNode();
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index a613e3beff..8d06137c99 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.exception.metadata.MeasurementInBlackListException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
-import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import org.apache.iotdb.db.metadata.MetadataConstant;
@@ -709,13 +708,6 @@ public class MTreeBelowSGMemoryImpl {
IDeviceMNode<IMemMNode> entityMNode;
synchronized (this) {
- for (String measurement : template.getSchemaMap().keySet()) {
- if (cur.hasChild(measurement)) {
- throw new TemplateImcompatibeException(
- activatePath.concatNode(measurement).getFullPath(), template.getName());
- }
- }
-
if (cur.isDevice()) {
entityMNode = cur.getAsDeviceMNode();
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 9db4a6f5d3..97febd7ce5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -86,7 +86,7 @@ public interface ISchemaRegion {
// region Interfaces for schema region Info query and operation
SchemaRegionId getSchemaRegionId();
- String getStorageGroupFullPath();
+ String getDatabaseFullPath();
// delete this schemaRegion and clear all resources
void deleteSchemaRegion() throws MetadataException;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index c2a31d3bd6..83e4b81eec 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -254,16 +254,14 @@ public class SchemaEngine {
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion != null) {
- if (schemaRegion.getStorageGroupFullPath().equals(storageGroup.getFullPath())) {
+ if (schemaRegion.getDatabaseFullPath().equals(storageGroup.getFullPath())) {
return;
} else {
throw new MetadataException(
String.format(
"SchemaRegion [%s] is duplicated between [%s] and [%s], "
+ "and the former one has been recovered.",
- schemaRegionId,
- schemaRegion.getStorageGroupFullPath(),
- storageGroup.getFullPath()));
+ schemaRegionId, schemaRegion.getDatabaseFullPath(), storageGroup.getFullPath()));
}
}
schemaRegionMap.put(
@@ -317,7 +315,7 @@ public class SchemaEngine {
schemaRegionMap.remove(schemaRegionId);
// check whether the sg dir is empty
- File sgDir = new File(config.getSchemaDir(), schemaRegion.getStorageGroupFullPath());
+ File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath());
File[] regionDirList =
sgDir.listFiles(
(dir, name) -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 45d88abc19..bdd8300746 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -380,7 +380,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
// region Interfaces for schema region Info query and operation
@Override
- public String getStorageGroupFullPath() {
+ public String getDatabaseFullPath() {
return storageGroupFullPath;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 1768ba2713..d46a9648c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -438,7 +438,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
// region Interfaces for schema region Info query and operation
@Override
- public String getStorageGroupFullPath() {
+ public String getDatabaseFullPath() {
return storageGroupFullPath;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
index 2af16c8e74..b1cde92fc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
@@ -38,20 +38,25 @@ import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -66,11 +71,17 @@ public class ClusterTemplateManager implements ITemplateManager {
private final Map<Integer, Template> templateIdMap = new ConcurrentHashMap<>();
// <TemplateName, TemplateId>
private final Map<String, Integer> templateNameMap = new ConcurrentHashMap<>();
+
// <FullPath, TemplateId>
private final Map<PartialPath, Integer> pathSetTemplateMap = new ConcurrentHashMap<>();
// <TemplateId, List<FullPath>>
private final Map<Integer, List<PartialPath>> templateSetOnPathsMap = new ConcurrentHashMap<>();
+ // <FullPath, TemplateId>
+ private final Map<PartialPath, Integer> pathPreSetTemplateMap = new ConcurrentHashMap<>();
+ // <TemplateId, List<FullPath>>
+ private final Map<Integer, Set<PartialPath>> templatePreSetOnPathsMap = new ConcurrentHashMap<>();
+
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static final class ClusterTemplateManagerHolder {
@@ -193,14 +204,36 @@ public class ClusterTemplateManager implements ITemplateManager {
}
@Override
- public void setSchemaTemplate(String name, PartialPath path) {
+ public void setSchemaTemplate(String queryId, String name, PartialPath path) {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSetSchemaTemplateReq req = new TSetSchemaTemplateReq();
+ req.setQueryId(queryId);
req.setName(name);
req.setPath(path.getFullPath());
- TSStatus tsStatus = configNodeClient.setSchemaTemplate(req);
- if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+
+ TSStatus tsStatus;
+ do {
+ try {
+ tsStatus = configNodeClient.setSchemaTemplate(req);
+ } catch (TTransportException e) {
+ if (e.getType() == TTransportException.TIMED_OUT
+ || e.getCause() instanceof SocketTimeoutException) {
+ // Time out mainly caused by slow execution, just wait
+ tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
+ } else {
+ throw e;
+ }
+ }
+ // Keep waiting until task ends
+ } while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.warn(
+ "Failed to execute set schema template {} on path {} in config node, status is {}.",
+ name,
+ path,
+ tsStatus);
throw new IoTDBException(tsStatus.getMessage(), tsStatus.getCode());
}
} catch (Exception e) {
@@ -246,11 +279,11 @@ public class ClusterTemplateManager implements ITemplateManager {
}
@Override
- public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath) {
readWriteLock.readLock().lock();
try {
for (PartialPath templateSetPath : pathSetTemplateMap.keySet()) {
- if (path.startsWith(templateSetPath.getNodes())) {
+ if (devicePath.startsWithOrPrefixOf(templateSetPath.getNodes())) {
return new Pair<>(
templateIdMap.get(pathSetTemplateMap.get(templateSetPath)), templateSetPath);
}
@@ -261,6 +294,50 @@ public class ClusterTemplateManager implements ITemplateManager {
}
}
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias) {
+ readWriteLock.readLock().lock();
+ try {
+ for (PartialPath templateSetPath : pathSetTemplateMap.keySet()) {
+ if (timeSeriesPath.startsWithOrPrefixOf(templateSetPath.getNodes())) {
+ return new Pair<>(
+ templateIdMap.get(pathSetTemplateMap.get(templateSetPath)), templateSetPath);
+ }
+ if (alias != null) {
+ if (timeSeriesPath
+ .getDevicePath()
+ .concatNode(alias)
+ .startsWithOrPrefixOf(templateSetPath.getNodes())) {
+ return new Pair<>(
+ templateIdMap.get(pathSetTemplateMap.get(templateSetPath)), templateSetPath);
+ }
+ }
+ }
+ for (PartialPath templatePreSetPath : pathPreSetTemplateMap.keySet()) {
+ if (timeSeriesPath.startsWithOrPrefixOf(templatePreSetPath.getNodes())
+ || timeSeriesPath.startsWithOrPrefixOf(
+ timeSeriesPath.getDevicePath().concatNode(alias).getNodes())) {
+ return new Pair<>(
+ templateIdMap.get(pathPreSetTemplateMap.get(templatePreSetPath)), templatePreSetPath);
+ }
+ if (alias != null) {
+ if (timeSeriesPath
+ .getDevicePath()
+ .concatNode(alias)
+ .startsWithOrPrefixOf(templatePreSetPath.getNodes())) {
+ return new Pair<>(
+ templateIdMap.get(pathPreSetTemplateMap.get(templatePreSetPath)),
+ templatePreSetPath);
+ }
+ }
+ }
+ return null;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
@Override
public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
readWriteLock.readLock().lock();
@@ -321,7 +398,56 @@ public class ClusterTemplateManager implements ITemplateManager {
return false;
}
+ // This is used for template info sync when activating DataNode and registering into cluster. All
+ // set and pre-set info will be updated.
public void updateTemplateSetInfo(byte[] templateSetInfo) {
+ if (templateSetInfo == null) {
+ return;
+ }
+ readWriteLock.writeLock().lock();
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(templateSetInfo);
+
+ Map<Template, List<Pair<String, Boolean>>> parsedTemplateSetInfo =
+ TemplateInternalRPCUtil.parseAddAllTemplateSetInfoBytes(buffer);
+ for (Map.Entry<Template, List<Pair<String, Boolean>>> entry :
+ parsedTemplateSetInfo.entrySet()) {
+ Template template = entry.getKey();
+ templateIdMap.put(template.getId(), template);
+ templateNameMap.put(template.getName(), template.getId());
+
+ for (Pair<String, Boolean> pathSetTemplate : entry.getValue()) {
+ try {
+ PartialPath path = new PartialPath(pathSetTemplate.left);
+ if (pathSetTemplate.right) {
+ // pre set
+ pathPreSetTemplateMap.put(path, template.getId());
+ Set<PartialPath> paths =
+ templatePreSetOnPathsMap.computeIfAbsent(
+ template.getId(), integer -> new HashSet<>());
+ paths.add(path);
+ } else {
+ // commit set
+ pathSetTemplateMap.put(path, template.getId());
+ List<PartialPath> pathList =
+ templateSetOnPathsMap.computeIfAbsent(
+ template.getId(), integer -> new ArrayList<>());
+ pathList.add(path);
+ }
+
+ } catch (IllegalPathException ignored) {
+
+ }
+ }
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ // This is used for rollback template unset operation. The provided template will be directly
+ // added to pathSetTemplateMap without any processing on pathPreSetTemplateMap
+ public void addTemplateSetInfo(byte[] templateSetInfo) {
if (templateSetInfo == null) {
return;
}
@@ -333,19 +459,19 @@ public class ClusterTemplateManager implements ITemplateManager {
TemplateInternalRPCUtil.parseAddTemplateSetInfoBytes(buffer);
for (Map.Entry<Template, List<String>> entry : parsedTemplateSetInfo.entrySet()) {
Template template = entry.getKey();
- templateIdMap.put(template.getId(), template);
- templateNameMap.put(template.getName(), template.getId());
+ int templateId = template.getId();
+ templateIdMap.put(templateId, template);
+ templateNameMap.put(template.getName(), templateId);
for (String pathSetTemplate : entry.getValue()) {
try {
PartialPath path = new PartialPath(pathSetTemplate);
- pathSetTemplateMap.put(path, template.getId());
- List<PartialPath> pathList =
- templateSetOnPathsMap.computeIfAbsent(
- template.getId(), integer -> new ArrayList<>());
- pathList.add(path);
+ pathSetTemplateMap.put(path, templateId);
+ templateSetOnPathsMap
+ .computeIfAbsent(templateId, integer -> new ArrayList<>())
+ .add(path);
} catch (IllegalPathException ignored) {
-
+ // won't happen
}
}
}
@@ -367,15 +493,29 @@ public class ClusterTemplateManager implements ITemplateManager {
String pathSetTemplate = parsedInfo.right;
try {
PartialPath path = new PartialPath(pathSetTemplate);
+
pathSetTemplateMap.remove(path);
if (templateSetOnPathsMap.containsKey(templateId)) {
templateSetOnPathsMap.get(templateId).remove(path);
if (templateSetOnPathsMap.get(templateId).isEmpty()) {
templateSetOnPathsMap.remove(templateId);
- Template template = templateIdMap.remove(templateId);
- templateNameMap.remove(template.getName());
}
}
+
+ pathPreSetTemplateMap.remove(path);
+ if (templatePreSetOnPathsMap.containsKey(templateId)) {
+ templatePreSetOnPathsMap.get(templateId).remove(path);
+ if (templatePreSetOnPathsMap.get(templateId).isEmpty()) {
+ templatePreSetOnPathsMap.remove(templateId);
+ }
+ }
+
+ if (!templateSetOnPathsMap.containsKey(templateId)
+ && !templatePreSetOnPathsMap.containsKey(templateId)) {
+ // such template is useless on DataNode since no related set/preset path
+ Template template = templateIdMap.remove(templateId);
+ templateNameMap.remove(template.getName());
+ }
} catch (IllegalPathException ignored) {
}
@@ -384,6 +524,78 @@ public class ClusterTemplateManager implements ITemplateManager {
}
}
+ public void addTemplatePreSetInfo(byte[] templateSetInfo) {
+ if (templateSetInfo == null) {
+ return;
+ }
+ readWriteLock.writeLock().lock();
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(templateSetInfo);
+
+ Map<Template, List<String>> parsedTemplateSetInfo =
+ TemplateInternalRPCUtil.parseAddTemplateSetInfoBytes(buffer);
+ for (Map.Entry<Template, List<String>> entry : parsedTemplateSetInfo.entrySet()) {
+ Template template = entry.getKey();
+ templateIdMap.put(template.getId(), template);
+ templateNameMap.put(template.getName(), template.getId());
+
+ for (String pathSetTemplate : entry.getValue()) {
+ try {
+ PartialPath path = new PartialPath(pathSetTemplate);
+ pathPreSetTemplateMap.put(path, template.getId());
+ Set<PartialPath> pathList =
+ templatePreSetOnPathsMap.computeIfAbsent(
+ template.getId(), integer -> new HashSet<>());
+ pathList.add(path);
+ } catch (IllegalPathException ignored) {
+ // won't happen
+ }
+ }
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public void commitTemplatePreSetInfo(byte[] templateSetInfo) {
+ if (templateSetInfo == null) {
+ return;
+ }
+ readWriteLock.writeLock().lock();
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(templateSetInfo);
+
+ Map<Template, List<String>> parsedTemplateSetInfo =
+ TemplateInternalRPCUtil.parseAddTemplateSetInfoBytes(buffer);
+ for (Map.Entry<Template, List<String>> entry : parsedTemplateSetInfo.entrySet()) {
+ Template template = entry.getKey();
+ int templateId = template.getId();
+ templateIdMap.put(templateId, template);
+ templateNameMap.put(template.getName(), templateId);
+
+ for (String pathSetTemplate : entry.getValue()) {
+ try {
+ PartialPath path = new PartialPath(pathSetTemplate);
+ pathSetTemplateMap.put(path, templateId);
+ templateSetOnPathsMap
+ .computeIfAbsent(templateId, integer -> new ArrayList<>())
+ .add(path);
+
+ pathPreSetTemplateMap.remove(path);
+ templatePreSetOnPathsMap.get(templateId).remove(path);
+ if (templatePreSetOnPathsMap.get(templateId).isEmpty()) {
+ templatePreSetOnPathsMap.remove(templateId);
+ }
+ } catch (IllegalPathException ignored) {
+ // won't happen
+ }
+ }
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
@TestOnly
public void putTemplate(Template template) {
templateIdMap.put(template.getId(), template);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ITemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ITemplateManager.java
index cff8ce788a..a3088b2adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ITemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ITemplateManager.java
@@ -58,7 +58,7 @@ public interface ITemplateManager {
* @param name templateName
* @param path mount path
*/
- void setSchemaTemplate(String name, PartialPath path);
+ void setSchemaTemplate(String queryId, String name, PartialPath path);
/**
* get info of mounted template
@@ -68,7 +68,10 @@ public interface ITemplateManager {
*/
List<PartialPath> getPathsSetTemplate(String name);
- Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path);
+ Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath);
+
+ Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias);
Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
index e51c943356..7e15b66038 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUpdateType.java
@@ -27,7 +27,9 @@ import java.nio.ByteBuffer;
public enum TemplateInternalRPCUpdateType {
ADD_TEMPLATE_SET_INFO((byte) 0),
- INVALIDATE_TEMPLATE_SET_INFO((byte) 1);
+ INVALIDATE_TEMPLATE_SET_INFO((byte) 1),
+ ADD_TEMPLATE_PRE_SET_INFO((byte) 2),
+ COMMIT_TEMPLATE_SET_INFO((byte) 3);
private final byte operationType;
@@ -45,14 +47,7 @@ public enum TemplateInternalRPCUpdateType {
public static TemplateInternalRPCUpdateType deserialize(ByteBuffer buffer) {
byte type = ReadWriteIOUtils.readByte(buffer);
- switch (type) {
- case 0:
- return ADD_TEMPLATE_SET_INFO;
- case 1:
- return INVALIDATE_TEMPLATE_SET_INFO;
- default:
- throw new IllegalArgumentException("Unknown template update operation type" + type);
- }
+ return getType(type);
}
public static TemplateInternalRPCUpdateType getType(byte type) {
@@ -61,6 +56,10 @@ public enum TemplateInternalRPCUpdateType {
return ADD_TEMPLATE_SET_INFO;
case 1:
return INVALIDATE_TEMPLATE_SET_INFO;
+ case 2:
+ return ADD_TEMPLATE_PRE_SET_INFO;
+ case 3:
+ return COMMIT_TEMPLATE_SET_INFO;
default:
throw new IllegalArgumentException("Unknown template update operation type" + type);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java
index 8300190b4c..c70a66812f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateInternalRPCUtil.java
@@ -46,16 +46,36 @@ public class TemplateInternalRPCUtil {
return outputStream.toByteArray();
}
- public static byte[] generateAddTemplateSetInfoBytes(
- Map<Template, List<String>> templateSetInfo) {
+ public static Map<Template, List<String>> parseAddTemplateSetInfoBytes(ByteBuffer buffer) {
+ int templateNum = ReadWriteIOUtils.readInt(buffer);
+ Map<Template, List<String>> result = new HashMap<>(templateNum);
+ int pathNum;
+ List<String> templateSetPathList;
+ for (int i = 0; i < templateNum; i++) {
+ Template template = new Template();
+ template.deserialize(buffer);
+
+ pathNum = ReadWriteIOUtils.readInt(buffer);
+ templateSetPathList = new ArrayList<>(pathNum);
+ for (int j = 0; j < pathNum; j++) {
+ templateSetPathList.add(ReadWriteIOUtils.readString(buffer));
+ }
+ result.put(template, templateSetPathList);
+ }
+ return result;
+ }
+
+ public static byte[] generateAddAllTemplateSetInfoBytes(
+ Map<Template, List<Pair<String, Boolean>>> templateSetInfo) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
ReadWriteIOUtils.write(templateSetInfo.size(), outputStream);
- for (Map.Entry<Template, List<String>> entry : templateSetInfo.entrySet()) {
+ for (Map.Entry<Template, List<Pair<String, Boolean>>> entry : templateSetInfo.entrySet()) {
entry.getKey().serialize(outputStream);
ReadWriteIOUtils.write(entry.getValue().size(), outputStream);
- for (String templateSetPath : entry.getValue()) {
- ReadWriteIOUtils.write(templateSetPath, outputStream);
+ for (Pair<String, Boolean> templateSetPath : entry.getValue()) {
+ ReadWriteIOUtils.write(templateSetPath.left, outputStream);
+ ReadWriteIOUtils.write(templateSetPath.right, outputStream);
}
}
} catch (IOException ignored) {
@@ -63,11 +83,12 @@ public class TemplateInternalRPCUtil {
return outputStream.toByteArray();
}
- public static Map<Template, List<String>> parseAddTemplateSetInfoBytes(ByteBuffer buffer) {
+ public static Map<Template, List<Pair<String, Boolean>>> parseAddAllTemplateSetInfoBytes(
+ ByteBuffer buffer) {
int templateNum = ReadWriteIOUtils.readInt(buffer);
- Map<Template, List<String>> result = new HashMap<>(templateNum);
+ Map<Template, List<Pair<String, Boolean>>> result = new HashMap<>(templateNum);
int pathNum;
- List<String> templateSetPathList;
+ List<Pair<String, Boolean>> templateSetPathList;
for (int i = 0; i < templateNum; i++) {
Template template = new Template();
template.deserialize(buffer);
@@ -75,7 +96,8 @@ public class TemplateInternalRPCUtil {
pathNum = ReadWriteIOUtils.readInt(buffer);
templateSetPathList = new ArrayList<>(pathNum);
for (int j = 0; j < pathNum; j++) {
- templateSetPathList.add(ReadWriteIOUtils.readString(buffer));
+ templateSetPathList.add(
+ new Pair<>(ReadWriteIOUtils.readString(buffer), ReadWriteIOUtils.readBool(buffer)));
}
result.put(template, templateSetPathList);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 2c261dbb2b..d9879f511d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -183,7 +183,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
database =
((SchemaDriverContext) operatorContext.getDriverContext())
.getSchemaRegion()
- .getStorageGroupFullPath();
+ .getDatabaseFullPath();
}
return database;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
index 16e58d2a44..93dee654ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
@@ -32,6 +32,11 @@ public class SchemaSourceFactory {
private SchemaSourceFactory() {};
+ public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaSource(
+ PartialPath pathPattern) {
+ return new TimeSeriesSchemaSource(pathPattern, false, 0, 0, null, null, false, null);
+ }
+
public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaSource(
PartialPath pathPattern,
boolean isPrefixMatch,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index c72d369b28..c4354fbcc5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1725,51 +1725,28 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
private void checkIsTemplateCompatible(PartialPath timeseriesPath, String alias) {
- Pair<Template, PartialPath> templateInfo = schemaFetcher.checkTemplateSetInfo(timeseriesPath);
+ Pair<Template, PartialPath> templateInfo =
+ schemaFetcher.checkTemplateSetAndPreSetInfo(timeseriesPath, alias);
if (templateInfo != null) {
- if (templateInfo.left.hasSchema(timeseriesPath.getMeasurement())) {
- throw new RuntimeException(
- new TemplateImcompatibeException(
- timeseriesPath.getFullPath(),
- templateInfo.left.getName(),
- timeseriesPath.getMeasurement()));
- }
-
- if (alias != null && templateInfo.left.hasSchema(alias)) {
- throw new RuntimeException(
- new TemplateImcompatibeException(
- timeseriesPath.getDevicePath().concatNode(alias).getFullPath(),
- templateInfo.left.getName(),
- alias));
- }
+ throw new RuntimeException(
+ new TemplateImcompatibeException(
+ timeseriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right));
}
}
private void checkIsTemplateCompatible(
PartialPath devicePath, List<String> measurements, List<String> aliasList) {
- Pair<Template, PartialPath> templateInfo = schemaFetcher.checkTemplateSetInfo(devicePath);
- if (templateInfo != null) {
- Template template = templateInfo.left;
- for (String measurement : measurements) {
- if (template.hasSchema(measurement)) {
- throw new RuntimeException(
- new TemplateImcompatibeException(
- devicePath.concatNode(measurement).getFullPath(),
- templateInfo.left.getName(),
- measurement));
- }
- }
-
- if (aliasList == null) {
- return;
- }
-
- for (String alias : aliasList) {
- if (template.hasSchema(alias)) {
- throw new RuntimeException(
- new TemplateImcompatibeException(
- devicePath.concatNode(alias).getFullPath(), templateInfo.left.getName(), alias));
- }
+ for (int i = 0; i < measurements.size(); i++) {
+ Pair<Template, PartialPath> templateInfo =
+ schemaFetcher.checkTemplateSetAndPreSetInfo(
+ devicePath.concatNode(measurements.get(i)),
+ aliasList == null ? null : aliasList.get(i));
+ if (templateInfo != null) {
+ throw new RuntimeException(
+ new TemplateImcompatibeException(
+ devicePath.getFullPath() + measurements,
+ templateInfo.left.getName(),
+ templateInfo.right));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 29c44080ed..240c5c48b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -418,8 +418,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
@Override
- public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
- return templateManager.checkTemplateSetInfo(path);
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath) {
+ return templateManager.checkTemplateSetInfo(devicePath);
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias) {
+ return templateManager.checkTemplateSetAndPreSetInfo(timeSeriesPath, alias);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 054d3bc1fd..450c4dc3b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -84,7 +84,10 @@ public interface ISchemaFetcher {
List<CompressionType[]> compressionTypes,
List<Boolean> aligned);
- Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path);
+ Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath);
+
+ Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias);
Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 7ad92d4c59..a3c56b19e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -333,7 +333,7 @@ public class ConfigTaskVisitor
@Override
public IConfigTask visitSetSchemaTemplate(
SetSchemaTemplateStatement setSchemaTemplateStatement, TaskContext context) {
- return new SetSchemaTemplateTask(setSchemaTemplateStatement);
+ return new SetSchemaTemplateTask(context.getQueryId(), setSchemaTemplateStatement);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2455c14edc..2dfbdef2c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1267,17 +1267,21 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> setSchemaTemplate(
- SetSchemaTemplateStatement setSchemaTemplateStatement) {
+ String queryId, SetSchemaTemplateStatement setSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
String templateName = setSchemaTemplateStatement.getTemplateName();
PartialPath path = setSchemaTemplateStatement.getPath();
try {
// Send request to some API server
- ClusterTemplateManager.getInstance().setSchemaTemplate(templateName, path);
+ ClusterTemplateManager.getInstance().setSchemaTemplate(queryId, templateName, path);
// build TSBlock
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } catch (Exception e) {
- future.setException(e.getCause());
+ } catch (Throwable e) {
+ if (e.getCause() instanceof IoTDBException) {
+ future.setException(e.getCause());
+ } else {
+ future.setException(e);
+ }
}
return future;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index f55e653519..559a4db82d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -133,7 +133,7 @@ public interface IConfigTaskExecutor {
ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement);
SettableFuture<ConfigTaskResult> setSchemaTemplate(
- SetSchemaTemplateStatement setSchemaTemplateStatement);
+ String queryId, SetSchemaTemplateStatement setSchemaTemplateStatement);
SettableFuture<ConfigTaskResult> showPathSetTemplate(
ShowPathSetTemplateStatement showPathSetTemplateStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/SetSchemaTemplateTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/SetSchemaTemplateTask.java
index e695d61f86..81a11c30a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/SetSchemaTemplateTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/template/SetSchemaTemplateTask.java
@@ -28,15 +28,18 @@ import com.google.common.util.concurrent.ListenableFuture;
public class SetSchemaTemplateTask implements IConfigTask {
+ private final String queryId;
private final SetSchemaTemplateStatement setSchemaTemplateStatement;
- public SetSchemaTemplateTask(SetSchemaTemplateStatement setSchemaTemplateStatement) {
+ public SetSchemaTemplateTask(
+ String queryId, SetSchemaTemplateStatement setSchemaTemplateStatement) {
+ this.queryId = queryId;
this.setSchemaTemplateStatement = setSchemaTemplateStatement;
}
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.setSchemaTemplate(setSchemaTemplateStatement);
+ return configTaskExecutor.setSchemaTemplate(queryId, setSchemaTemplateStatement);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 34db75c702..4704bf122a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -61,6 +61,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.settle.SettleRequestHandler;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
+import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
@@ -73,6 +75,8 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceFailureInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.operator.schema.source.ISchemaSource;
+import org.apache.iotdb.db.mpp.execution.operator.schema.source.SchemaSourceFactory;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -123,6 +127,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
@@ -416,13 +422,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
AtomicInteger preDeletedNum = new AtomicInteger(0);
TSStatus executionResult =
- executeSchemaDeletionTask(
+ executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
- .getStorageGroupFullPath();
+ .getDatabaseFullPath();
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
@@ -448,13 +454,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
- return executeSchemaDeletionTask(
+ return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
- .getStorageGroupFullPath();
+ .getDatabaseFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
@@ -494,7 +500,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
PathPatternTree filteredPatternTree =
- filterPathPatternTree(patternTree, schemaRegion.getStorageGroupFullPath());
+ filterPathPatternTree(patternTree, schemaRegion.getDatabaseFullPath());
if (filteredPatternTree.isEmpty()) {
continue;
}
@@ -525,7 +531,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
List<PartialPath> pathList = patternTree.getAllPathPatterns();
- return executeSchemaDeletionTask(
+ return executeInternalSchemaTask(
req.getDataRegionIdList(),
consensusGroupId -> {
RegionWriteExecutor executor = new RegionWriteExecutor();
@@ -541,13 +547,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
- return executeSchemaDeletionTask(
+ return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
- .getStorageGroupFullPath();
+ .getDatabaseFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
@@ -568,7 +574,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
Map<PartialPath, List<Integer>> templateSetInfo =
transformTemplateSetInfo(req.getTemplateSetInfo());
TSStatus executionResult =
- executeSchemaDeletionTask(
+ executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
Map<PartialPath, List<Integer>> filteredTemplateSetInfo =
@@ -630,7 +636,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
new PartialPath(
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
- .getStorageGroupFullPath());
+ .getDatabaseFullPath());
} catch (IllegalPathException ignored) {
// won't reach here
}
@@ -642,7 +648,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
throws TException {
Map<PartialPath, List<Integer>> templateSetInfo =
transformTemplateSetInfo(req.getTemplateSetInfo());
- return executeSchemaDeletionTask(
+ return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
Map<PartialPath, List<Integer>> filteredTemplateSetInfo =
@@ -665,7 +671,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public TSStatus deactivateTemplate(TDeactivateTemplateReq req) throws TException {
Map<PartialPath, List<Integer>> templateSetInfo =
transformTemplateSetInfo(req.getTemplateSetInfo());
- return executeSchemaDeletionTask(
+ return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
Map<PartialPath, List<Integer>> filteredTemplateSetInfo =
@@ -690,7 +696,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
AtomicLong result = new AtomicLong(0);
resp.setStatus(
- executeSchemaDeletionTask(
+ executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
ReadWriteLock readWriteLock =
@@ -701,7 +707,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
PathPatternTree filteredPatternTree =
- filterPathPatternTree(patternTree, schemaRegion.getStorageGroupFullPath());
+ filterPathPatternTree(patternTree, schemaRegion.getDatabaseFullPath());
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
@@ -719,7 +725,70 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return resp;
}
- private TSStatus executeSchemaDeletionTask(
+ @Override
+ public TCheckTimeSeriesExistenceResp checkTimeSeriesExistence(TCheckTimeSeriesExistenceReq req)
+ throws TException {
+ PathPatternTree patternTree = PathPatternTree.deserialize(req.patternTree);
+ TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
+ TSStatus status =
+ executeInternalSchemaTask(
+ req.getSchemaRegionIdList(),
+ consensusGroupId -> {
+ ReadWriteLock readWriteLock =
+ regionManager.getRegionLock(new SchemaRegionId(consensusGroupId.getId()));
+ // check timeseries existence for set template shall block all timeseries creation
+ readWriteLock.writeLock().lock();
+ try {
+ ISchemaRegion schemaRegion =
+ schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
+ PathPatternTree filteredPatternTree =
+ filterPathPatternTree(patternTree, schemaRegion.getDatabaseFullPath());
+ if (filteredPatternTree.isEmpty()) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+ for (PartialPath pattern : filteredPatternTree.getAllPathPatterns()) {
+ ISchemaSource<ITimeSeriesSchemaInfo> schemaSource =
+ SchemaSourceFactory.getTimeSeriesSchemaSource(pattern);
+ try (ISchemaReader<ITimeSeriesSchemaInfo> schemaReader =
+ schemaSource.getSchemaReader(schemaRegion)) {
+ if (schemaReader.hasNext()) {
+ return RpcUtils.getStatus(TSStatusCode.TIMESERIES_ALREADY_EXIST);
+ }
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage(), e);
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ });
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.setStatus(RpcUtils.SUCCESS_STATUS);
+ resp.setExists(false);
+ } else if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ boolean hasFailure = false;
+ for (TSStatus subStatus : status.getSubStatus()) {
+ if (subStatus.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ resp.setExists(true);
+ } else if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ hasFailure = true;
+ break;
+ }
+ }
+ if (hasFailure) {
+ resp.setStatus(status);
+ } else {
+ resp.setStatus(RpcUtils.SUCCESS_STATUS);
+ }
+ } else {
+ resp.setStatus(status);
+ }
+ return resp;
+ }
+
+ private TSStatus executeInternalSchemaTask(
List<TConsensusGroupId> consensusGroupIdList,
Function<TConsensusGroupId, TSStatus> executeOnOneRegion) {
List<TSStatus> statusList = new ArrayList<>();
@@ -1155,11 +1224,17 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public TSStatus updateTemplate(TUpdateTemplateReq req) throws TException {
switch (TemplateInternalRPCUpdateType.getType(req.type)) {
case ADD_TEMPLATE_SET_INFO:
- ClusterTemplateManager.getInstance().updateTemplateSetInfo(req.getTemplateInfo());
+ ClusterTemplateManager.getInstance().addTemplateSetInfo(req.getTemplateInfo());
break;
case INVALIDATE_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
break;
+ case ADD_TEMPLATE_PRE_SET_INFO:
+ ClusterTemplateManager.getInstance().addTemplatePreSetInfo(req.getTemplateInfo());
+ break;
+ case COMMIT_TEMPLATE_SET_INFO:
+ ClusterTemplateManager.getInstance().commitTemplatePreSetInfo(req.getTemplateInfo());
+ break;
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
index c26746d859..498e67303b 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
@@ -64,7 +64,7 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest {
File mLogFile =
SystemFileFactory.INSTANCE.getFile(
- schemaRegion.getStorageGroupFullPath()
+ schemaRegion.getDatabaseFullPath()
+ File.separator
+ schemaRegion.getSchemaRegionId().getId(),
MetadataConstant.METADATA_LOG);
@@ -164,7 +164,7 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest {
File mLogFile =
SystemFileFactory.INSTANCE.getFile(
- schemaRegion.getStorageGroupFullPath()
+ schemaRegion.getDatabaseFullPath()
+ File.separator
+ schemaRegion.getSchemaRegionId().getId(),
MetadataConstant.METADATA_LOG);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
index 7adb9b07bc..90f7368504 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
@@ -82,7 +82,7 @@ public class SchemaQueryScanOperatorTest {
1, planNodeId, SchemaQueryScanOperator.class.getSimpleName());
PartialPath partialPath = new PartialPath(META_SCAN_OPERATOR_TEST_SG + ".device0");
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
- Mockito.when(schemaRegion.getStorageGroupFullPath()).thenReturn(META_SCAN_OPERATOR_TEST_SG);
+ Mockito.when(schemaRegion.getDatabaseFullPath()).thenReturn(META_SCAN_OPERATOR_TEST_SG);
IDeviceSchemaInfo deviceSchemaInfo = Mockito.mock(IDeviceSchemaInfo.class);
Mockito.when(deviceSchemaInfo.getFullPath())
.thenReturn(META_SCAN_OPERATOR_TEST_SG + ".device0");
@@ -190,7 +190,7 @@ public class SchemaQueryScanOperatorTest {
}
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
- Mockito.when(schemaRegion.getStorageGroupFullPath()).thenReturn(META_SCAN_OPERATOR_TEST_SG);
+ Mockito.when(schemaRegion.getDatabaseFullPath()).thenReturn(META_SCAN_OPERATOR_TEST_SG);
operatorContext.setDriverContext(
new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 78701c0623..3a3b97cb3d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -128,7 +128,13 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
}
@Override
- public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath) {
+ return null;
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias) {
return null;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index 26462b5be4..7123e2c389 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -324,7 +324,13 @@ public class Util {
}
@Override
- public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath devicePath) {
+ return null;
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias) {
return null;
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 772fa1b561..dbc5ac6fe2 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -570,8 +570,9 @@ struct TGetTemplateResp {
}
struct TSetSchemaTemplateReq {
- 1: required string name
- 2: required string path
+ 1: required string queryId
+ 2: required string name
+ 3: required string path
}
struct TGetPathsSetTemplatesResp {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 3b8d18de89..73b631487b 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -355,6 +355,16 @@ struct TCountPathsUsingTemplateResp{
2: optional i64 count
}
+struct TCheckTimeSeriesExistenceReq{
+ 1: required binary patternTree
+ 2: required list<common.TConsensusGroupId> schemaRegionIdList
+}
+
+struct TCheckTimeSeriesExistenceResp{
+ 1: required common.TSStatus status
+ 2: optional bool exists
+}
+
struct TCreatePipeOnDataNodeReq{
1: required binary pipeMeta
}
@@ -740,6 +750,8 @@ service IDataNodeRPCService {
TCountPathsUsingTemplateResp countPathsUsingTemplate(TCountPathsUsingTemplateReq req)
+ TCheckTimeSeriesExistenceResp checkTimeSeriesExistence(TCheckTimeSeriesExistenceReq req)
+
/**
* Create PIPE on DataNode
*/