You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:43:59 UTC
[iotdb] 02/08: extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa1f667ac1745c216bbef22fbf9954799ea5b1f3
Author: Marccos <15...@qq.com>
AuthorDate: Mon Dec 12 11:33:19 2022 +0800
extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor
---
.../analyze/schema/AutoCreateSchemaExecutor.java | 137 +++++++++++++
.../analyze/schema/ClusterSchemaFetchExecutor.java | 118 +++++++++++
.../plan/analyze/schema/ClusterSchemaFetcher.java | 223 ++++-----------------
3 files changed, 292 insertions(+), 186 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..b5389698b5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+class AutoCreateSchemaExecutor {
+
+ private final Function<Statement, ExecutionResult> statementExecutor;
+
+ AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> statementExecutor) {
+ this.statementExecutor = statementExecutor;
+ }
+
+ // try to create the target timeseries and return schemaTree involving successfully created
+ // timeseries and existing timeseries
+ ClusterSchemaTree internalCreateTimeseries(
+ PartialPath devicePath,
+ List<String> measurements,
+ List<TSDataType> tsDataTypes,
+ List<TSEncoding> encodings,
+ List<CompressionType> compressors,
+ boolean isAligned) {
+ List<MeasurementPath> measurementPathList =
+ executeInternalCreateTimeseriesStatement(
+ new InternalCreateTimeSeriesStatement(
+ devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
+
+ Set<Integer> alreadyExistingMeasurementIndexSet =
+ measurementPathList.stream()
+ .map(o -> measurements.indexOf(o.getMeasurement()))
+ .collect(Collectors.toSet());
+
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ schemaTree.appendMeasurementPaths(measurementPathList);
+
+ for (int i = 0, size = measurements.size(); i < size; i++) {
+ if (alreadyExistingMeasurementIndexSet.contains(i)) {
+ continue;
+ }
+
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(measurements.get(i)),
+ new MeasurementSchema(
+ measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
+ null,
+ null,
+ isAligned);
+ }
+
+ return schemaTree;
+ }
+
+ // auto create timeseries and return the existing timeseries info
+ private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
+ InternalCreateTimeSeriesStatement statement) {
+
+ ExecutionResult executionResult = statementExecutor.apply(statement);
+
+ int statusCode = executionResult.status.getCode();
+ if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return Collections.emptyList();
+ }
+
+ if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ throw new RuntimeException(
+ new IoTDBException(executionResult.status.getMessage(), statusCode));
+ }
+
+ Set<String> failedCreationSet = new HashSet<>();
+ List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+ for (TSStatus subStatus : executionResult.status.subStatus) {
+ if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ alreadyExistingMeasurements.add(
+ MeasurementPath.parseDataFromString(subStatus.getMessage()));
+ } else {
+ failedCreationSet.add(subStatus.message);
+ }
+ }
+
+ if (!failedCreationSet.isEmpty()) {
+ throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
+ }
+
+ return alreadyExistingMeasurements;
+ }
+
+ void internalActivateTemplate(PartialPath devicePath) {
+ ExecutionResult executionResult =
+ statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+ TSStatus status = executionResult.status;
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..9b0d3d7c81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+ private final Coordinator coordinator;
+ private final Supplier<Long> queryIdProvider;
+ private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+
+ ClusterSchemaFetchExecutor(
+ Coordinator coordinator,
+ Supplier<Long> queryIdProvider,
+ BiFunction<Long, Statement, ExecutionResult> statementExecutor) {
+ this.coordinator = coordinator;
+ this.queryIdProvider = queryIdProvider;
+ this.statementExecutor = statementExecutor;
+ }
+
+ ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+ long queryId = queryIdProvider.get();
+ try {
+ ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
+ if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new RuntimeException(
+ String.format(
+ "cannot fetch schema, status is: %s, msg is: %s",
+ executionResult.status.getCode(), executionResult.status.getMessage()));
+ }
+ try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
+ ClusterSchemaTree result = new ClusterSchemaTree();
+ Set<String> databaseSet = new HashSet<>();
+ while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+ // The query will be transited to FINISHED when invoking getBatchResult() at the last time
+ // So we don't need to clean up it manually
+ Optional<TsBlock> tsBlock;
+ try {
+ tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+ } catch (IoTDBException e) {
+ throw new RuntimeException("Fetch Schema failed. ", e);
+ }
+ if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+ break;
+ }
+ Column column = tsBlock.get().getColumn(0);
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ parseFetchedData(column.getBinary(i), result, databaseSet);
+ }
+ }
+ result.setDatabases(databaseSet);
+ return result;
+ }
+ } finally {
+ coordinator.cleanupQueryExecution(queryId);
+ }
+ }
+
+ private void parseFetchedData(
+ Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
+ InputStream inputStream = new ByteArrayInputStream(data.getValues());
+ try {
+ byte type = ReadWriteIOUtils.readByte(inputStream);
+ if (type == 0) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+ }
+ } else if (type == 1) {
+ resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+ } else {
+ throw new RuntimeException(
+ new MetadataException("Failed to fetch schema because of unrecognized data"));
+ }
+ } catch (IOException e) {
+ // Totally memory operation. This case won't happen.
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 67fc47d941..3f88d88825 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -18,15 +18,11 @@
*/
package org.apache.iotdb.db.mpp.plan.analyze.schema;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -36,37 +32,22 @@ import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -77,11 +58,36 @@ import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncodin
public class ClusterSchemaFetcher implements ISchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
private final Coordinator coordinator = Coordinator.getInstance();
+
private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
+ private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+ new AutoCreateSchemaExecutor(
+ statement ->
+ coordinator.execute(
+ statement,
+ SessionManager.getInstance().requestQueryId(),
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold()));
+ private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+ new ClusterSchemaFetchExecutor(
+ coordinator,
+ () -> SessionManager.getInstance().requestQueryId(),
+ (queryId, statement) ->
+ coordinator.execute(
+ statement,
+ queryId,
+ null,
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ this,
+ config.getQueryTimeoutThreshold()));
+
private static final class ClusterSchemaFetcherHolder {
private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -113,7 +119,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
if (withTags) {
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+ return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+ new SchemaFetchStatement(patternTree, templateMap, withTags));
}
List<PartialPath> fullPathList = new ArrayList<>();
@@ -124,7 +131,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
if (fullPathList.isEmpty()) {
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+ return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+ new SchemaFetchStatement(patternTree, templateMap, withTags));
}
// The schema cache R/W and fetch operation must be locked together thus the cache clean
@@ -148,13 +156,17 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
}
if (isAllCached) {
+ // The entry iterating order of HashMap is to some extent decided by the putting order.
+ // Therefore, we must avoid merge operation on cachedSchemaTree and fetchedSchemaTree,
+ // since the cache state varies among DataNodes.
schemaTree.setDatabases(storageGroupSet);
return schemaTree;
}
}
schemaTree =
- executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+ clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+ new SchemaFetchStatement(patternTree, templateMap, withTags));
// only cache the schema fetched by full path
List<MeasurementPath> measurementPathList;
@@ -172,73 +184,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
}
- private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
- long queryId = SessionManager.getInstance().requestQueryId();
- try {
- ExecutionResult executionResult =
- coordinator.execute(
- schemaFetchStatement,
- queryId,
- null,
- "",
- ClusterPartitionFetcher.getInstance(),
- this,
- config.getQueryTimeoutThreshold());
- if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new RuntimeException(
- String.format(
- "cannot fetch schema, status is: %s, msg is: %s",
- executionResult.status.getCode(), executionResult.status.getMessage()));
- }
- try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
- ClusterSchemaTree result = new ClusterSchemaTree();
- Set<String> databaseSet = new HashSet<>();
- while (coordinator.getQueryExecution(queryId).hasNextResult()) {
- // The query will be transited to FINISHED when invoking getBatchResult() at the last time
- // So we don't need to clean up it manually
- Optional<TsBlock> tsBlock;
- try {
- tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- } catch (IoTDBException e) {
- throw new RuntimeException("Fetch Schema failed. ", e);
- }
- if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
- break;
- }
- Column column = tsBlock.get().getColumn(0);
- for (int i = 0; i < column.getPositionCount(); i++) {
- parseFetchedData(column.getBinary(i), result, databaseSet);
- }
- }
- result.setDatabases(databaseSet);
- return result;
- }
- } finally {
- coordinator.cleanupQueryExecution(queryId);
- }
- }
-
- private void parseFetchedData(
- Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
- InputStream inputStream = new ByteArrayInputStream(data.getValues());
- try {
- byte type = ReadWriteIOUtils.readByte(inputStream);
- if (type == 0) {
- int size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- databaseSet.add(ReadWriteIOUtils.readString(inputStream));
- }
- } else if (type == 1) {
- resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
- } else {
- throw new RuntimeException(
- new MetadataException("Failed to fetch schema because of unrecognized data"));
- }
- } catch (IOException e) {
- // Totally memory operation. This case won't happen.
- }
- }
-
@Override
public ISchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath,
@@ -420,7 +365,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
if (shouldActivateTemplate) {
- internalActivateTemplate(devicePath);
+ autoCreateSchemaExecutor.internalActivateTemplate(devicePath);
List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
if (!template.hasSchema(measurements[i])) {
@@ -470,7 +415,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
if (!missingMeasurements.isEmpty()) {
schemaTree.mergeSchemaTree(
- internalCreateTimeseries(
+ autoCreateSchemaExecutor.internalCreateTimeseries(
devicePath,
missingMeasurements,
dataTypesOfMissingMeasurement,
@@ -499,100 +444,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return indexOfMissingMeasurements;
}
- // try to create the target timeseries and return schemaTree involving successfully created
- // timeseries and existing timeseries
- private ClusterSchemaTree internalCreateTimeseries(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> tsDataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- boolean isAligned) {
- List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
- new InternalCreateTimeSeriesStatement(
- devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
-
- Set<Integer> alreadyExistingMeasurementIndexSet =
- measurementPathList.stream()
- .map(o -> measurements.indexOf(o.getMeasurement()))
- .collect(Collectors.toSet());
-
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- schemaTree.appendMeasurementPaths(measurementPathList);
-
- for (int i = 0, size = measurements.size(); i < size; i++) {
- if (alreadyExistingMeasurementIndexSet.contains(i)) {
- continue;
- }
-
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(measurements.get(i)),
- new MeasurementSchema(
- measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
- null,
- null,
- isAligned);
- }
-
- return schemaTree;
- }
-
- // auto create timeseries and return the existing timeseries info
- private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
- InternalCreateTimeSeriesStatement statement) {
-
- ExecutionResult executionResult = executeStatement(statement);
-
- int statusCode = executionResult.status.getCode();
- if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return Collections.emptyList();
- }
-
- if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- throw new RuntimeException(
- new IoTDBException(executionResult.status.getMessage(), statusCode));
- }
-
- Set<String> failedCreationSet = new HashSet<>();
- List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
- for (TSStatus subStatus : executionResult.status.subStatus) {
- if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- alreadyExistingMeasurements.add(
- MeasurementPath.parseDataFromString(subStatus.getMessage()));
- } else {
- failedCreationSet.add(subStatus.message);
- }
- }
-
- if (!failedCreationSet.isEmpty()) {
- throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
- }
-
- return alreadyExistingMeasurements;
- }
-
- public void internalActivateTemplate(PartialPath devicePath) {
- ExecutionResult executionResult = executeStatement(new ActivateTemplateStatement(devicePath));
- TSStatus status = executionResult.status;
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
- throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
- }
- }
-
- private ExecutionResult executeStatement(Statement statement) {
- long queryId = SessionManager.getInstance().requestQueryId();
- return coordinator.execute(
- statement,
- queryId,
- null,
- "",
- ClusterPartitionFetcher.getInstance(),
- this,
- config.getQueryTimeoutThreshold());
- }
-
@Override
public void invalidAllCache() {
DataNodeSchemaCache.getInstance().cleanUp();