You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/08/10 14:01:38 UTC
[iotdb] branch master updated: [IOTDB-3938] Optimize Standalone schema fetch performance (#6949)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f1787b83c5 [IOTDB-3938] Optimize Standalone schema fetch performance (#6949)
f1787b83c5 is described below
commit f1787b83c5cdb98262d7e8d324c8c8f8b812f1b5
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Aug 10 22:01:33 2022 +0800
[IOTDB-3938] Optimize Standalone schema fetch performance (#6949)
[IOTDB-3938] Optimize Standalone schema fetch performance (#6949)
---
.../schemaregion/rocksdb/RSchemaRegion.java | 8 +
.../db/metadata/schemaregion/ISchemaRegion.java | 6 +
.../schemaregion/SchemaRegionMemoryImpl.java | 45 ++++
.../schemaregion/SchemaRegionSchemaFileImpl.java | 8 +
.../mpp/common/schematree/ClusterSchemaTree.java | 15 +-
.../common/schematree/DeviceGroupSchemaTree.java | 98 +++++++++
.../db/mpp/common/schematree/DeviceSchemaInfo.java | 110 +++++++---
.../common/schematree/MeasurementSchemaInfo.java | 53 +++++
.../visitor/SchemaTreeDeviceVisitor.java | 13 +-
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 231 +++++----------------
10 files changed, 372 insertions(+), 215 deletions(-)
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 574bd21ca6..686b0f0b8f 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeValueType;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMeasurementMNode;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -1921,6 +1922,13 @@ public class RSchemaRegion implements ISchemaRegion {
return deviceMNode;
}
+ @Override
+ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void clear() {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 700eb6f4d7..afa6bbfaea 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.io.File;
@@ -354,6 +356,10 @@ public interface ISchemaRegion {
// region Interfaces for InsertPlan process
/** get schema for device. Attention!!! Only support insertPlan */
IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException, IOException;
+
+ DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ throws MetadataException;
// endregion
// region Interfaces for Template operations
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 78754414eb..0a8bac1ad2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -56,6 +56,8 @@ import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.MeasurementSchemaInfo;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -85,6 +87,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -1744,6 +1747,48 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
}
+ @Override
+ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ throws MetadataException {
+ try {
+ List<MeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>(measurements.length);
+ IMNode deviceMNode = getDeviceNodeWithAutoCreate(devicePath);
+ IMeasurementMNode measurementMNode;
+ for (int i = 0; i < measurements.length; i++) {
+ measurementMNode = getMeasurementMNode(deviceMNode, measurements[i]);
+ if (measurementMNode == null) {
+ if (config.isAutoCreateSchemaEnabled()) {
+ if (aligned) {
+ internalAlignedCreateTimeseries(
+ devicePath,
+ Collections.singletonList(measurements[i]),
+ Collections.singletonList(tsDataTypes[i]));
+
+ } else {
+ internalCreateTimeseries(devicePath.concatNode(measurements[i]), tsDataTypes[i]);
+ }
+ // after creating timeseries, the deviceMNode has been replaced by a new entityMNode
+ deviceMNode = mtree.getNodeByPath(devicePath);
+ measurementMNode = getMeasurementMNode(deviceMNode, measurements[i]);
+ } else {
+ throw new PathNotExistException(devicePath + PATH_SEPARATOR + measurements[i]);
+ }
+ }
+ measurementSchemaInfoList.add(
+ new MeasurementSchemaInfo(
+ measurementMNode.getName(),
+ (MeasurementSchema) measurementMNode.getSchema(),
+ measurementMNode.getAlias()));
+ }
+
+ return new DeviceSchemaInfo(
+ devicePath, deviceMNode.getAsEntityMNode().isAligned(), measurementSchemaInfoList);
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
+ }
+
// endregion
// region Interfaces and Implementation for Template operations
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index a49e3767de..c68aa3ded4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -1606,6 +1607,13 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
return deviceMNode;
}
+ @Override
+ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned)
+ throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
private IMNode getDeviceInTemplateIfUsingTemplate(
PartialPath devicePath, String[] measurementList) throws MetadataException, IOException {
// 1. get device node, set using template if accessed.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index f4d23b628a..ae5e7b97b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -128,18 +128,25 @@ public class ClusterSchemaTree implements ISchemaTree {
return null;
}
- List<SchemaMeasurementNode> measurementNodeList = new ArrayList<>();
+ List<MeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>();
SchemaNode node;
+ SchemaMeasurementNode measurementNode;
for (String measurement : measurements) {
node = cur.getChild(measurement);
if (node == null) {
- measurementNodeList.add(null);
+ measurementSchemaInfoList.add(null);
} else {
- measurementNodeList.add(node.getAsMeasurementNode());
+ measurementNode = node.getAsMeasurementNode();
+ measurementSchemaInfoList.add(
+ new MeasurementSchemaInfo(
+ measurementNode.getName(),
+ measurementNode.getSchema(),
+ measurementNode.getAlias()));
}
}
- return new DeviceSchemaInfo(devicePath, cur.getAsEntityNode().isAligned(), measurementNodeList);
+ return new DeviceSchemaInfo(
+ devicePath, cur.getAsEntityNode().isAligned(), measurementSchemaInfoList);
}
public void appendMeasurementPaths(List<MeasurementPath> measurementPathList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
new file mode 100644
index 0000000000..955f9e10e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.common.schematree;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is specifically for standalone schema validation during data insertion. Since the
+ * schema fetch is mainly based on device path, the schema is directly grouped by device rater than
+ * organized as a trie.
+ */
+public class DeviceGroupSchemaTree implements ISchemaTree {
+
+ private final Map<PartialPath, DeviceSchemaInfo> deviceSchemaInfoMap = new HashMap<>();
+
+ @Override
+ public DeviceSchemaInfo searchDeviceSchemaInfo(
+ PartialPath devicePath, List<String> measurements) {
+ return deviceSchemaInfoMap.get(devicePath).getSubDeviceSchemaInfo(measurements);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return deviceSchemaInfoMap.isEmpty();
+ }
+
+ public void addDeviceInfo(DeviceSchemaInfo deviceSchemaInfo) {
+ deviceSchemaInfoMap.put(deviceSchemaInfo.getDevicePath(), deviceSchemaInfo);
+ }
+
+ public void merge(DeviceGroupSchemaTree schemaTree) {
+ deviceSchemaInfoMap.putAll(schemaTree.deviceSchemaInfoMap);
+ }
+
+ @Override
+ public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
+ PartialPath pathPattern, int slimit, int soffset, boolean isPrefixMatch) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(PartialPath pathPattern) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<MeasurementPath> getAllMeasurement() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getBelongedStorageGroup(String pathName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getBelongedStorageGroup(PartialPath path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> getStorageGroups() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
index 15d76a0cb2..14b3c62e17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
@@ -23,25 +23,33 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class DeviceSchemaInfo {
- private final PartialPath devicePath;
- private final boolean isAligned;
- private final List<SchemaMeasurementNode> measurementNodeList;
+ private PartialPath devicePath;
+ private boolean isAligned;
+ private List<MeasurementSchemaInfo> measurementSchemaInfoList;
+
+ private Map<String, MeasurementSchemaInfo> measurementSchemaInfoMap;
+ private Map<String, MeasurementSchemaInfo> aliasMap;
+
+ private DeviceSchemaInfo() {}
public DeviceSchemaInfo(
- PartialPath devicePath, boolean isAligned, List<SchemaMeasurementNode> measurementNodeList) {
+ PartialPath devicePath,
+ boolean isAligned,
+ List<MeasurementSchemaInfo> measurementSchemaInfoList) {
this.devicePath = devicePath;
this.isAligned = isAligned;
- this.measurementNodeList = measurementNodeList;
+ this.measurementSchemaInfoList = measurementSchemaInfoList;
}
public PartialPath getDevicePath() {
@@ -52,26 +60,72 @@ public class DeviceSchemaInfo {
return isAligned;
}
+ public DeviceSchemaInfo getSubDeviceSchemaInfo(List<String> measurements) {
+ DeviceSchemaInfo result = new DeviceSchemaInfo();
+ result.devicePath = devicePath;
+ result.isAligned = isAligned;
+ List<MeasurementSchemaInfo> desiredMeasurementSchemaInfoList =
+ new ArrayList<>(measurements.size());
+
+ if (measurementSchemaInfoMap == null) {
+ constructMap();
+ }
+
+ MeasurementSchemaInfo measurementSchemaInfo;
+ for (String measurement : measurements) {
+ measurementSchemaInfo = measurementSchemaInfoMap.get(measurement);
+ if (measurementSchemaInfo == null) {
+ measurementSchemaInfo = aliasMap.get(measurement);
+ }
+
+ if (measurementSchemaInfo == null) {
+ desiredMeasurementSchemaInfoList.add(null);
+ }
+
+ desiredMeasurementSchemaInfoList.add(measurementSchemaInfo);
+ }
+
+ result.measurementSchemaInfoList = desiredMeasurementSchemaInfoList;
+ return result;
+ }
+
+ private void constructMap() {
+ measurementSchemaInfoMap = new HashMap<>();
+ aliasMap = new HashMap<>();
+ measurementSchemaInfoList.forEach(
+ measurementSchemaInfo -> {
+ if (measurementSchemaInfo == null) {
+ return;
+ }
+ measurementSchemaInfoMap.put(measurementSchemaInfo.getName(), measurementSchemaInfo);
+ if (measurementSchemaInfo.getAlias() != null) {
+ aliasMap.put(measurementSchemaInfo.getAlias(), measurementSchemaInfo);
+ }
+ });
+ }
+
public List<MeasurementSchema> getMeasurementSchemaList() {
- return measurementNodeList.stream()
- .map(measurementNode -> measurementNode == null ? null : measurementNode.getSchema())
+ return measurementSchemaInfoList.stream()
+ .map(
+ measurementSchemaInfo ->
+ measurementSchemaInfo == null ? null : measurementSchemaInfo.getSchema())
.collect(Collectors.toList());
}
public List<MeasurementPath> getMeasurements(Set<String> measurements) {
if (measurements.contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
- return measurementNodeList.stream()
+ return measurementSchemaInfoList.stream()
.map(
- measurementNode -> {
- if (measurementNode == null) {
+ measurementInfo -> {
+ if (measurementInfo == null) {
return null;
}
MeasurementPath measurementPath =
new MeasurementPath(
- devicePath.concatNode(measurementNode.getName()),
- measurementNode.getSchema());
- if (measurementNode.getAlias() != null) {
- measurementPath.setMeasurementAlias(measurementNode.getAlias());
+ devicePath.concatNode(measurementInfo.getName()),
+ measurementInfo.getSchema());
+ if (measurementInfo.getAlias() != null) {
+ measurementPath.setMeasurementAlias(measurementInfo.getAlias());
}
measurementPath.setUnderAlignedEntity(isAligned);
return measurementPath;
@@ -79,16 +133,17 @@ public class DeviceSchemaInfo {
.collect(Collectors.toList());
}
List<MeasurementPath> measurementPaths = new ArrayList<>();
- for (SchemaMeasurementNode measurementNode : measurementNodeList) {
+ for (MeasurementSchemaInfo measurementSchemaInfo : measurementSchemaInfoList) {
MeasurementPath measurementPath =
new MeasurementPath(
- devicePath.concatNode(measurementNode.getName()), measurementNode.getSchema());
+ devicePath.concatNode(measurementSchemaInfo.getName()),
+ measurementSchemaInfo.getSchema());
measurementPath.setUnderAlignedEntity(isAligned);
- if (measurements.contains(measurementNode.getName())) {
+ if (measurements.contains(measurementSchemaInfo.getName())) {
measurementPaths.add(measurementPath);
- } else if (measurementNode.getAlias() != null
- && measurements.contains(measurementNode.getAlias())) {
- measurementPath.setMeasurementAlias(measurementNode.getAlias());
+ } else if (measurementSchemaInfo.getAlias() != null
+ && measurements.contains(measurementSchemaInfo.getAlias())) {
+ measurementPath.setMeasurementAlias(measurementSchemaInfo.getAlias());
measurementPaths.add(measurementPath);
}
}
@@ -96,16 +151,17 @@ public class DeviceSchemaInfo {
}
public MeasurementPath getPathByMeasurement(String measurementName) {
- for (SchemaMeasurementNode measurementNode : measurementNodeList) {
+ for (MeasurementSchemaInfo measurementSchemaInfo : measurementSchemaInfoList) {
MeasurementPath measurementPath =
new MeasurementPath(
- devicePath.concatNode(measurementNode.getName()), measurementNode.getSchema());
+ devicePath.concatNode(measurementSchemaInfo.getName()),
+ measurementSchemaInfo.getSchema());
measurementPath.setUnderAlignedEntity(isAligned);
- if (measurementNode.getName().equals(measurementName)) {
+ if (measurementSchemaInfo.getName().equals(measurementName)) {
return measurementPath;
- } else if (measurementNode.getAlias() != null
- && measurementNode.getAlias().equals(measurementName)) {
- measurementPath.setMeasurementAlias(measurementNode.getAlias());
+ } else if (measurementSchemaInfo.getAlias() != null
+ && measurementSchemaInfo.getAlias().equals(measurementName)) {
+ measurementPath.setMeasurementAlias(measurementSchemaInfo.getAlias());
return measurementPath;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java
new file mode 100644
index 0000000000..bf000c7e5b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.common.schematree;
+
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+/**
+ * This class acts as common measurement schema format during system module interactions, mainly in
+ * analyzer and SchemaFetcher. Currently, this class cooperates with DeviceSchemaInfo and wraps
+ * measurement name, alias and MeasurementSchema, which are necessary to construct schemaTree for
+ * Query and Insertion.
+ */
+public class MeasurementSchemaInfo {
+
+ private final String name;
+ private final String alias;
+ private final MeasurementSchema schema;
+
+ public MeasurementSchemaInfo(String name, MeasurementSchema schema, String alias) {
+ this.name = name;
+ this.schema = schema;
+ this.alias = alias;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public MeasurementSchema getSchema() {
+ return schema;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeDeviceVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeDeviceVisitor.java
index 6379a7ada3..4f7ef159d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeDeviceVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeDeviceVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.common.schematree.visitor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.MeasurementSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
@@ -50,17 +51,23 @@ public class SchemaTreeDeviceVisitor extends SchemaTreeVisitor<DeviceSchemaInfo>
@Override
protected DeviceSchemaInfo generateResult() {
PartialPath path = new PartialPath(generateFullPathNodes(nextMatchedNode));
- List<SchemaMeasurementNode> measurementNodeList = new ArrayList<>();
+ List<MeasurementSchemaInfo> measurementSchemaInfoList = new ArrayList<>();
Iterator<SchemaNode> iterator = getChildrenIterator(nextMatchedNode);
SchemaNode node;
+ SchemaMeasurementNode measurementNode;
while (iterator.hasNext()) {
node = iterator.next();
if (node.isMeasurement()) {
- measurementNodeList.add(node.getAsMeasurementNode());
+ measurementNode = node.getAsMeasurementNode();
+ measurementSchemaInfoList.add(
+ new MeasurementSchemaInfo(
+ measurementNode.getName(),
+ measurementNode.getSchema(),
+ measurementNode.getAlias()));
}
}
return new DeviceSchemaInfo(
- path, nextMatchedNode.getAsEntityNode().isAligned(), measurementNodeList);
+ path, nextMatchedNode.getAsEntityNode().isAligned(), measurementSchemaInfoList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index 86a30c1f3e..eea35fc91b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -23,32 +23,26 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceGroupSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
public class StandaloneSchemaFetcher implements ISchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -88,35 +82,23 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
@Override
public ISchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-
- PathPatternTree patternTree = new PathPatternTree();
- for (String measurement : measurements) {
- patternTree.appendFullPath(devicePath, measurement);
- }
-
- if (patternTree.isEmpty()) {
- return schemaTree;
- }
-
- ClusterSchemaTree fetchedSchemaTree;
+ DeviceSchemaInfo deviceSchemaInfo =
+ getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, tsDataTypes, aligned);
+ DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
+ schemaTree.addDeviceInfo(deviceSchemaInfo);
+ return schemaTree;
+ }
- if (!config.isAutoCreateSchemaEnabled()) {
- fetchedSchemaTree = fetchSchema(patternTree);
- schemaTree.mergeSchemaTree(fetchedSchemaTree);
- return schemaTree;
+ private DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
+ try {
+ SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(devicePath);
+ ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+ return schemaRegion.getDeviceSchemaInfoWithAutoCreate(
+ devicePath, measurements, tsDataTypes, aligned);
+ } catch (MetadataException e) {
+ throw new RuntimeException(e);
}
-
- fetchedSchemaTree = fetchSchema(patternTree);
- schemaTree.mergeSchemaTree(fetchedSchemaTree);
-
- ClusterSchemaTree missingSchemaTree =
- checkAndAutoCreateMissingMeasurements(
- fetchedSchemaTree, devicePath, measurements, tsDataTypes, aligned);
-
- schemaTree.mergeSchemaTree(missingSchemaTree);
-
- return schemaTree;
}
@Override
@@ -125,40 +107,44 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
List<String[]> measurementsList,
List<TSDataType[]> tsDataTypesList,
List<Boolean> isAlignedList) {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- PathPatternTree patternTree = new PathPatternTree();
- for (int i = 0; i < devicePathList.size(); i++) {
- for (String measurement : measurementsList.get(i)) {
- patternTree.appendFullPath(devicePathList.get(i), measurement);
- }
- }
-
- if (patternTree.isEmpty()) {
- return schemaTree;
+ Map<PartialPath, List<Integer>> deviceMap = new HashMap<>();
+ for (int i = 0, size = devicePathList.size(); i < size; i++) {
+ deviceMap.computeIfAbsent(devicePathList.get(i), k -> new ArrayList<>()).add(i);
}
- ClusterSchemaTree fetchedSchemaTree;
+ DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree();
- if (!config.isAutoCreateSchemaEnabled()) {
- fetchedSchemaTree = fetchSchema(patternTree);
- schemaTree.mergeSchemaTree(fetchedSchemaTree);
- return schemaTree;
- }
+ for (Map.Entry<PartialPath, List<Integer>> entry : deviceMap.entrySet()) {
+ int totalSize = 0;
+ boolean isAligned = isAlignedList.get(entry.getValue().get(0));
+ for (int index : entry.getValue()) {
+ if (isAlignedList.get(entry.getValue().get(index)) != isAligned) {
+ throw new StatementAnalyzeException(
+ String.format("Inconsistent device alignment of %s in insert plan.", entry.getKey()));
+ }
+ totalSize += measurementsList.get(index).length;
+ }
- fetchedSchemaTree = fetchSchema(patternTree);
- schemaTree.mergeSchemaTree(fetchedSchemaTree);
+ String[] measurements = new String[totalSize];
+ TSDataType[] tsDataTypes = new TSDataType[totalSize];
+
+ int curPos = 0;
+ for (int index : entry.getValue()) {
+ System.arraycopy(
+ measurementsList.get(index),
+ 0,
+ measurements,
+ curPos,
+ measurementsList.get(index).length);
+ System.arraycopy(
+ tsDataTypesList.get(index), 0, tsDataTypes, curPos, tsDataTypesList.get(index).length);
+ curPos += measurementsList.get(index).length;
+ }
- ClusterSchemaTree missingSchemaTree;
- for (int i = 0; i < devicePathList.size(); i++) {
- missingSchemaTree =
- checkAndAutoCreateMissingMeasurements(
- schemaTree,
- devicePathList.get(i),
- measurementsList.get(i),
- tsDataTypesList.get(i),
- isAlignedList.get(i));
- schemaTree.mergeSchemaTree(missingSchemaTree);
+ schemaTree.addDeviceInfo(
+ getDeviceSchemaInfoWithAutoCreate(entry.getKey(), measurements, tsDataTypes, isAligned));
}
+
return schemaTree;
}
@@ -179,121 +165,4 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
@Override
public void invalidAllCache() {}
-
- private Pair<List<String>, List<TSDataType>> checkMissingMeasurements(
- ISchemaTree schemaTree,
- PartialPath devicePath,
- String[] measurements,
- TSDataType[] tsDataTypes) {
- DeviceSchemaInfo deviceSchemaInfo =
- schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
- if (deviceSchemaInfo == null) {
- return new Pair<>(Arrays.asList(measurements), Arrays.asList(tsDataTypes));
- }
-
- List<String> missingMeasurements = new ArrayList<>();
- List<TSDataType> dataTypesOfMissingMeasurement = new ArrayList<>();
- List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
- for (int i = 0; i < measurements.length; i++) {
- if (schemaList.get(i) == null) {
- missingMeasurements.add(measurements[i]);
- dataTypesOfMissingMeasurement.add(tsDataTypes[i]);
- }
- }
-
- return new Pair<>(missingMeasurements, dataTypesOfMissingMeasurement);
- }
-
- private ClusterSchemaTree checkAndAutoCreateMissingMeasurements(
- ISchemaTree schemaTree,
- PartialPath devicePath,
- String[] measurements,
- TSDataType[] tsDataTypes,
- boolean isAligned) {
-
- Pair<List<String>, List<TSDataType>> checkResult =
- checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
-
- List<String> missingMeasurements = checkResult.left;
- List<TSDataType> dataTypesOfMissingMeasurement = checkResult.right;
-
- if (missingMeasurements.isEmpty()) {
- return new ClusterSchemaTree();
- }
-
- internalCreateTimeseries(
- devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned);
-
- PathPatternTree patternTree = new PathPatternTree();
- for (String measurement : missingMeasurements) {
- patternTree.appendFullPath(devicePath, measurement);
- }
- ClusterSchemaTree reFetchSchemaTree = fetchSchema(patternTree);
-
- Pair<List<String>, List<TSDataType>> recheckResult =
- checkMissingMeasurements(
- reFetchSchemaTree,
- devicePath,
- missingMeasurements.toArray(new String[0]),
- dataTypesOfMissingMeasurement.toArray(new TSDataType[0]));
-
- missingMeasurements = recheckResult.left;
- if (!missingMeasurements.isEmpty()) {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("(");
- for (String missingMeasurement : missingMeasurements) {
- stringBuilder.append(missingMeasurement).append(" ");
- }
- stringBuilder.append(")");
- throw new RuntimeException(
- String.format(
- "Failed to auto create schema, devicePath: %s, measurements: %s",
- devicePath.getFullPath(), stringBuilder));
- }
-
- return reFetchSchemaTree;
- }
-
- private void internalCreateTimeseries(
- PartialPath devicePath,
- List<String> measurements,
- List<TSDataType> tsDataTypes,
- boolean isAligned) {
- try {
- if (isAligned) {
- CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = new CreateAlignedTimeSeriesPlan();
- createAlignedTimeSeriesPlan.setPrefixPath(devicePath);
- createAlignedTimeSeriesPlan.setMeasurements(measurements);
- createAlignedTimeSeriesPlan.setDataTypes(tsDataTypes);
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (TSDataType dataType : tsDataTypes) {
- encodings.add(getDefaultEncoding(dataType));
- compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
- }
- createAlignedTimeSeriesPlan.setEncodings(encodings);
- createAlignedTimeSeriesPlan.setCompressors(compressors);
- SchemaRegionId schemaRegionId =
- localConfigNode.getBelongedSchemaRegionIdWithAutoCreate(devicePath);
- ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
- schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
- } else {
- for (int i = 0; i < measurements.size(); i++) {
- CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan();
- createTimeSeriesPlan.setPath(
- new PartialPath(devicePath.getFullPath(), measurements.get(i)));
- createTimeSeriesPlan.setDataType(tsDataTypes.get(i));
- createTimeSeriesPlan.setEncoding(getDefaultEncoding(tsDataTypes.get(i)));
- createTimeSeriesPlan.setCompressor(
- TSFileDescriptor.getInstance().getConfig().getCompressor());
- SchemaRegionId schemaRegionId =
- localConfigNode.getBelongedSchemaRegionIdWithAutoCreate(devicePath);
- ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
- schemaRegion.createTimeseries(createTimeSeriesPlan, -1);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("cannot auto create schema ", e);
- }
- }
}