You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:43:59 UTC

[iotdb] 02/08: extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aa1f667ac1745c216bbef22fbf9954799ea5b1f3
Author: Marccos <15...@qq.com>
AuthorDate: Mon Dec 12 11:33:19 2022 +0800

    extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor
---
 .../analyze/schema/AutoCreateSchemaExecutor.java   | 137 +++++++++++++
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 118 +++++++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 223 ++++-----------------
 3 files changed, 292 insertions(+), 186 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..b5389698b5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+class AutoCreateSchemaExecutor {
+
+  private final Function<Statement, ExecutionResult> statementExecutor;
+
+  AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  // try to create the target timeseries and return schemaTree involving successfully created
+  // timeseries and existing timeseries
+  ClusterSchemaTree internalCreateTimeseries(
+      PartialPath devicePath,
+      List<String> measurements,
+      List<TSDataType> tsDataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      boolean isAligned) {
+    List<MeasurementPath> measurementPathList =
+        executeInternalCreateTimeseriesStatement(
+            new InternalCreateTimeSeriesStatement(
+                devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
+
+    Set<Integer> alreadyExistingMeasurementIndexSet =
+        measurementPathList.stream()
+            .map(o -> measurements.indexOf(o.getMeasurement()))
+            .collect(Collectors.toSet());
+
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    schemaTree.appendMeasurementPaths(measurementPathList);
+
+    for (int i = 0, size = measurements.size(); i < size; i++) {
+      if (alreadyExistingMeasurementIndexSet.contains(i)) {
+        continue;
+      }
+
+      schemaTree.appendSingleMeasurement(
+          devicePath.concatNode(measurements.get(i)),
+          new MeasurementSchema(
+              measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
+          null,
+          null,
+          isAligned);
+    }
+
+    return schemaTree;
+  }
+
+  // auto create timeseries and return the existing timeseries info
+  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
+      InternalCreateTimeSeriesStatement statement) {
+
+    ExecutionResult executionResult = statementExecutor.apply(statement);
+
+    int statusCode = executionResult.status.getCode();
+    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return Collections.emptyList();
+    }
+
+    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      throw new RuntimeException(
+          new IoTDBException(executionResult.status.getMessage(), statusCode));
+    }
+
+    Set<String> failedCreationSet = new HashSet<>();
+    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+    for (TSStatus subStatus : executionResult.status.subStatus) {
+      if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+        alreadyExistingMeasurements.add(
+            MeasurementPath.parseDataFromString(subStatus.getMessage()));
+      } else {
+        failedCreationSet.add(subStatus.message);
+      }
+    }
+
+    if (!failedCreationSet.isEmpty()) {
+      throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
+    }
+
+    return alreadyExistingMeasurements;
+  }
+
+  void internalActivateTemplate(PartialPath devicePath) {
+    ExecutionResult executionResult =
+        statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+    TSStatus status = executionResult.status;
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..9b0d3d7c81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+  private final Coordinator coordinator;
+  private final Supplier<Long> queryIdProvider;
+  private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+
+  ClusterSchemaFetchExecutor(
+      Coordinator coordinator,
+      Supplier<Long> queryIdProvider,
+      BiFunction<Long, Statement, ExecutionResult> statementExecutor) {
+    this.coordinator = coordinator;
+    this.queryIdProvider = queryIdProvider;
+    this.statementExecutor = statementExecutor;
+  }
+
+  ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+    long queryId = queryIdProvider.get();
+    try {
+      ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
+      if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException(
+            String.format(
+                "cannot fetch schema, status is: %s, msg is: %s",
+                executionResult.status.getCode(), executionResult.status.getMessage()));
+      }
+      try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
+        ClusterSchemaTree result = new ClusterSchemaTree();
+        Set<String> databaseSet = new HashSet<>();
+        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+          // The query will be transited to FINISHED when invoking getBatchResult() at the last time
+          // So we don't need to clean up it manually
+          Optional<TsBlock> tsBlock;
+          try {
+            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          } catch (IoTDBException e) {
+            throw new RuntimeException("Fetch Schema failed. ", e);
+          }
+          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+            break;
+          }
+          Column column = tsBlock.get().getColumn(0);
+          for (int i = 0; i < column.getPositionCount(); i++) {
+            parseFetchedData(column.getBinary(i), result, databaseSet);
+          }
+        }
+        result.setDatabases(databaseSet);
+        return result;
+      }
+    } finally {
+      coordinator.cleanupQueryExecution(queryId);
+    }
+  }
+
+  private void parseFetchedData(
+      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
+    InputStream inputStream = new ByteArrayInputStream(data.getValues());
+    try {
+      byte type = ReadWriteIOUtils.readByte(inputStream);
+      if (type == 0) {
+        int size = ReadWriteIOUtils.readInt(inputStream);
+        for (int i = 0; i < size; i++) {
+          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+        }
+      } else if (type == 1) {
+        resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+      } else {
+        throw new RuntimeException(
+            new MetadataException("Failed to fetch schema because of unrecognized data"));
+      }
+    } catch (IOException e) {
+      // Totally memory operation. This case won't happen.
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 67fc47d941..3f88d88825 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -18,15 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -36,37 +32,22 @@ import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -77,11 +58,36 @@ import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncodin
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
   private final Coordinator coordinator = Coordinator.getInstance();
+
   private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
   private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
 
+  private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+      new AutoCreateSchemaExecutor(
+          statement ->
+              coordinator.execute(
+                  statement,
+                  SessionManager.getInstance().requestQueryId(),
+                  null,
+                  "",
+                  ClusterPartitionFetcher.getInstance(),
+                  this,
+                  config.getQueryTimeoutThreshold()));
+  private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+      new ClusterSchemaFetchExecutor(
+          coordinator,
+          () -> SessionManager.getInstance().requestQueryId(),
+          (queryId, statement) ->
+              coordinator.execute(
+                  statement,
+                  queryId,
+                  null,
+                  "",
+                  ClusterPartitionFetcher.getInstance(),
+                  this,
+                  config.getQueryTimeoutThreshold()));
+
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
 
@@ -113,7 +119,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     if (withTags) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+      return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+          new SchemaFetchStatement(patternTree, templateMap, withTags));
     }
 
     List<PartialPath> fullPathList = new ArrayList<>();
@@ -124,7 +131,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     if (fullPathList.isEmpty()) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+      return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+          new SchemaFetchStatement(patternTree, templateMap, withTags));
     }
 
     // The schema cache R/W and fetch operation must be locked together thus the cache clean
@@ -148,13 +156,17 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           }
         }
         if (isAllCached) {
+          // The entry iterating order of HashMap is to some extent decided by the putting order.
+          // Therefore, we must avoid merge operation on cachedSchemaTree and fetchedSchemaTree,
+          // since the cache state varies among DataNodes.
           schemaTree.setDatabases(storageGroupSet);
           return schemaTree;
         }
       }
 
       schemaTree =
-          executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+          clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+              new SchemaFetchStatement(patternTree, templateMap, withTags));
 
       // only cache the schema fetched by full path
       List<MeasurementPath> measurementPathList;
@@ -172,73 +184,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
   }
 
-  private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    try {
-      ExecutionResult executionResult =
-          coordinator.execute(
-              schemaFetchStatement,
-              queryId,
-              null,
-              "",
-              ClusterPartitionFetcher.getInstance(),
-              this,
-              config.getQueryTimeoutThreshold());
-      if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new RuntimeException(
-            String.format(
-                "cannot fetch schema, status is: %s, msg is: %s",
-                executionResult.status.getCode(), executionResult.status.getMessage()));
-      }
-      try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
-        ClusterSchemaTree result = new ClusterSchemaTree();
-        Set<String> databaseSet = new HashSet<>();
-        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-          // The query will be transited to FINISHED when invoking getBatchResult() at the last time
-          // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock;
-          try {
-            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-          } catch (IoTDBException e) {
-            throw new RuntimeException("Fetch Schema failed. ", e);
-          }
-          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
-            break;
-          }
-          Column column = tsBlock.get().getColumn(0);
-          for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, databaseSet);
-          }
-        }
-        result.setDatabases(databaseSet);
-        return result;
-      }
-    } finally {
-      coordinator.cleanupQueryExecution(queryId);
-    }
-  }
-
-  private void parseFetchedData(
-      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
-    InputStream inputStream = new ByteArrayInputStream(data.getValues());
-    try {
-      byte type = ReadWriteIOUtils.readByte(inputStream);
-      if (type == 0) {
-        int size = ReadWriteIOUtils.readInt(inputStream);
-        for (int i = 0; i < size; i++) {
-          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
-        }
-      } else if (type == 1) {
-        resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
-      } else {
-        throw new RuntimeException(
-            new MetadataException("Failed to fetch schema because of unrecognized data"));
-      }
-    } catch (IOException e) {
-      // Totally memory operation. This case won't happen.
-    }
-  }
-
   @Override
   public ISchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath,
@@ -420,7 +365,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
 
       if (shouldActivateTemplate) {
-        internalActivateTemplate(devicePath);
+        autoCreateSchemaExecutor.internalActivateTemplate(devicePath);
         List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
         for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
           if (!template.hasSchema(measurements[i])) {
@@ -470,7 +415,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     if (!missingMeasurements.isEmpty()) {
       schemaTree.mergeSchemaTree(
-          internalCreateTimeseries(
+          autoCreateSchemaExecutor.internalCreateTimeseries(
               devicePath,
               missingMeasurements,
               dataTypesOfMissingMeasurement,
@@ -499,100 +444,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     return indexOfMissingMeasurements;
   }
 
-  // try to create the target timeseries and return schemaTree involving successfully created
-  // timeseries and existing timeseries
-  private ClusterSchemaTree internalCreateTimeseries(
-      PartialPath devicePath,
-      List<String> measurements,
-      List<TSDataType> tsDataTypes,
-      List<TSEncoding> encodings,
-      List<CompressionType> compressors,
-      boolean isAligned) {
-    List<MeasurementPath> measurementPathList =
-        executeInternalCreateTimeseriesStatement(
-            new InternalCreateTimeSeriesStatement(
-                devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
-
-    Set<Integer> alreadyExistingMeasurementIndexSet =
-        measurementPathList.stream()
-            .map(o -> measurements.indexOf(o.getMeasurement()))
-            .collect(Collectors.toSet());
-
-    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    schemaTree.appendMeasurementPaths(measurementPathList);
-
-    for (int i = 0, size = measurements.size(); i < size; i++) {
-      if (alreadyExistingMeasurementIndexSet.contains(i)) {
-        continue;
-      }
-
-      schemaTree.appendSingleMeasurement(
-          devicePath.concatNode(measurements.get(i)),
-          new MeasurementSchema(
-              measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
-          null,
-          null,
-          isAligned);
-    }
-
-    return schemaTree;
-  }
-
-  // auto create timeseries and return the existing timeseries info
-  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
-      InternalCreateTimeSeriesStatement statement) {
-
-    ExecutionResult executionResult = executeStatement(statement);
-
-    int statusCode = executionResult.status.getCode();
-    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return Collections.emptyList();
-    }
-
-    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      throw new RuntimeException(
-          new IoTDBException(executionResult.status.getMessage(), statusCode));
-    }
-
-    Set<String> failedCreationSet = new HashSet<>();
-    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
-    for (TSStatus subStatus : executionResult.status.subStatus) {
-      if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-        alreadyExistingMeasurements.add(
-            MeasurementPath.parseDataFromString(subStatus.getMessage()));
-      } else {
-        failedCreationSet.add(subStatus.message);
-      }
-    }
-
-    if (!failedCreationSet.isEmpty()) {
-      throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
-    }
-
-    return alreadyExistingMeasurements;
-  }
-
-  public void internalActivateTemplate(PartialPath devicePath) {
-    ExecutionResult executionResult = executeStatement(new ActivateTemplateStatement(devicePath));
-    TSStatus status = executionResult.status;
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-      throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
-    }
-  }
-
-  private ExecutionResult executeStatement(Statement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    return coordinator.execute(
-        statement,
-        queryId,
-        null,
-        "",
-        ClusterPartitionFetcher.getInstance(),
-        this,
-        config.getQueryTimeoutThreshold());
-  }
-
   @Override
   public void invalidAllCache() {
     DataNodeSchemaCache.getInstance().cleanUp();