You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 04:24:01 UTC
[iotdb] branch master updated: [IOTDB-3057]Auto create schema (#5770)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c98719fb4a [IOTDB-3057]Auto create schema (#5770)
c98719fb4a is described below
commit c98719fb4a3a21709e0847fd1789296935f05077
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu May 5 12:23:55 2022 +0800
[IOTDB-3057]Auto create schema (#5770)
---
.../db/metadata/cache/DataNodeSchemaCache.java | 151 ++++--------
.../iotdb/db/metadata/cache/SchemaCacheEntity.java | 114 ---------
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 58 +++++
.../db/mpp/common/schematree/DeviceSchemaInfo.java | 5 +-
.../db/mpp/common/schematree/PathPatternTree.java | 9 +-
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 44 ++--
.../common/schematree/node/SchemaInternalNode.java | 7 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 260 +++++++++++++++++++--
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 105 +++++++--
9 files changed, 469 insertions(+), 284 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 84f79dbeac..116d2af8fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -19,53 +19,36 @@
package org.apache.iotdb.db.metadata.cache;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
-import org.apache.iotdb.db.mpp.plan.analyze.FakeSchemaFetcherImpl;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
/**
* This class takes the responsibility of metadata cache management of all DataRegions under
* StorageEngine
*/
public class DataNodeSchemaCache {
- private static final Logger logger = LoggerFactory.getLogger(DataNodeSchemaCache.class);
-
- private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private Cache<PartialPath, SchemaCacheEntity> schemaEntityCache;
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // TODO use fakeSchemaFetcherImpl for test temporarily
- private static final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl();
+ private final Cache<PartialPath, SchemaCacheEntry> cache;
private DataNodeSchemaCache() {
- schemaEntityCache =
- Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
+ cache = Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
}
public static DataNodeSchemaCache getInstance() {
- return DataNodeSchemaCache.DataNodeSchemaEntryCacheHolder.INSTANCE;
+ return DataNodeSchemaCacheHolder.INSTANCE;
}
/** singleton pattern. */
- private static class DataNodeSchemaEntryCacheHolder {
+ private static class DataNodeSchemaCacheHolder {
private static final DataNodeSchemaCache INSTANCE = new DataNodeSchemaCache();
}
@@ -76,91 +59,41 @@ public class DataNodeSchemaCache {
* @param measurements
* @return timeseries partialPath and its SchemaEntity
*/
- public Map<PartialPath, SchemaCacheEntity> getSchemaEntity(
- PartialPath devicePath, String[] measurements) {
- Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
- SchemaCacheEntity schemaCacheEntity;
- List<String> fetchMeasurements = new ArrayList<>();
+ public SchemaTree get(PartialPath devicePath, String[] measurements) {
+ SchemaTree schemaTree = new SchemaTree();
+ SchemaCacheEntry schemaCacheEntry;
for (String measurement : measurements) {
- PartialPath path = null;
- try {
- path = new PartialPath(devicePath.getFullPath(), measurement);
- } catch (IllegalPathException e) {
- logger.error(
- "Create PartialPath:{} failed.",
- devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurement);
+ PartialPath path = devicePath.concatNode(measurement);
+ schemaCacheEntry = cache.getIfPresent(path);
+ if (schemaCacheEntry != null) {
+ schemaTree.appendSingleMeasurement(
+ devicePath.concatNode(
+ schemaCacheEntry.getSchemaEntryId()), // the cached path may be alias path
+ schemaCacheEntry.getMeasurementSchema(),
+ schemaCacheEntry.getAlias(),
+ schemaCacheEntry.isAligned());
}
- schemaCacheEntity = schemaEntityCache.getIfPresent(path);
- if (schemaCacheEntity != null) {
- schemaCacheEntityMap.put(path, schemaCacheEntity);
- } else {
- fetchMeasurements.add(measurement);
- }
- }
- if (fetchMeasurements.size() != 0) {
- SchemaTree schemaTree;
- schemaTree = schemaFetcher.fetchSchema(new PathPatternTree(devicePath, fetchMeasurements));
- // TODO need to construct schemaEntry from schemaTree
-
}
- return schemaCacheEntityMap;
+ return schemaTree;
}
- /**
- * Get SchemaEntity info with auto create schema
- *
- * @param devicePath
- * @param measurements
- * @param tsDataTypes
- * @param isAligned
- * @return timeseries partialPath and its SchemaEntity
- */
- public Map<PartialPath, SchemaCacheEntity> getSchemaEntityWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
- Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
- SchemaCacheEntity schemaCacheEntity;
- List<String> fetchMeasurements = new ArrayList<>();
- List<TSDataType> fetchTsDataTypes = new ArrayList<>();
- for (int i = 0; i < measurements.length; i++) {
- PartialPath path = null;
- try {
- path = new PartialPath(devicePath.getFullPath(), measurements[i]);
- } catch (IllegalPathException e) {
- logger.error(
- "Create PartialPath:{} failed.",
- devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurements[i]);
- }
- schemaCacheEntity = schemaEntityCache.getIfPresent(path);
- if (schemaCacheEntity != null) {
- schemaCacheEntityMap.put(path, schemaCacheEntity);
- } else {
- fetchMeasurements.add(measurements[i]);
- fetchTsDataTypes.add(tsDataTypes[i]);
- }
- }
- if (fetchMeasurements.size() != 0) {
- SchemaTree schemaTree;
- schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
- devicePath,
- fetchMeasurements.toArray(new String[fetchMeasurements.size()]),
- fetchTsDataTypes.toArray(new TSDataType[fetchTsDataTypes.size()]),
- isAligned);
- // TODO need to construct schemaEntry from schemaTree
-
- for (int i = 0; i < fetchMeasurements.size(); i++) {
- try {
- PartialPath path = new PartialPath(devicePath.getFullPath(), fetchMeasurements.get(i));
- SchemaCacheEntity entity =
- new SchemaCacheEntity(fetchMeasurements.get(i), fetchTsDataTypes.get(i), isAligned);
- schemaEntityCache.put(path, entity);
- schemaCacheEntityMap.put(path, entity);
- } catch (IllegalPathException e) {
- logger.error("Create PartialPath:{} failed.", devicePath.getFullPath());
- }
+ public void put(SchemaTree schemaTree) {
+ for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
+ SchemaCacheEntry schemaCacheEntry =
+ new SchemaCacheEntry(
+ (MeasurementSchema) measurementPath.getMeasurementSchema(),
+ measurementPath.isMeasurementAliasExists()
+ ? measurementPath.getMeasurementAlias()
+ : null,
+ measurementPath.isUnderAlignedEntity());
+ cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+ if (measurementPath.isMeasurementAliasExists()) {
+ // cache alias path
+ cache.put(
+ measurementPath.getDevicePath().concatNode(measurementPath.getMeasurementAlias()),
+ schemaCacheEntry);
}
}
- return schemaCacheEntityMap;
}
/**
@@ -170,17 +103,15 @@ public class DataNodeSchemaCache {
* @return
*/
public void invalidate(PartialPath partialPath) {
- schemaEntityCache.invalidate(partialPath);
+ cache.invalidate(partialPath);
}
- @TestOnly
- public void cleanUp() {
- schemaEntityCache.invalidateAll();
- schemaEntityCache.cleanUp();
+ public long estimatedSize() {
+ return cache.estimatedSize();
}
- @TestOnly
- protected Cache<PartialPath, SchemaCacheEntity> getSchemaEntityCache() {
- return schemaEntityCache;
+ public void cleanUp() {
+ cache.invalidateAll();
+ cache.cleanUp();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
deleted file mode 100644
index 0e07b56e3f..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.metadata.cache;
-
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
-public class SchemaCacheEntity {
- private final String schemaEntryId;
-
- private TSDataType tsDataType;
-
- private TSEncoding tsEncoding;
-
- private CompressionType compressionType;
-
- private String alias;
-
- private boolean isAligned;
-
- @TestOnly
- public SchemaCacheEntity() {
- this.schemaEntryId = "1";
- }
-
- public SchemaCacheEntity(String schemaEntryId, TSDataType tsDataType, boolean isAligned) {
- this.schemaEntryId = schemaEntryId;
- this.tsDataType = tsDataType;
- this.isAligned = isAligned;
- this.tsEncoding =
- TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder());
- this.compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
- this.alias = "";
- }
-
- public SchemaCacheEntity(
- String schemaEntryId,
- TSDataType tsDataType,
- TSEncoding tsEncoding,
- CompressionType compressionType,
- String alias,
- boolean isAligned) {
- this.schemaEntryId = schemaEntryId;
- this.tsDataType = tsDataType;
- this.tsEncoding = tsEncoding;
- this.compressionType = compressionType;
- this.alias = alias;
- this.isAligned = isAligned;
- }
-
- public String getSchemaEntryId() {
- return schemaEntryId;
- }
-
- public TSDataType getTsDataType() {
- return tsDataType;
- }
-
- public void setTsDataType(TSDataType tsDataType) {
- this.tsDataType = tsDataType;
- }
-
- public TSEncoding getTsEncoding() {
- return tsEncoding;
- }
-
- public void setTsEncoding(TSEncoding tsEncoding) {
- this.tsEncoding = tsEncoding;
- }
-
- public CompressionType getCompressionType() {
- return compressionType;
- }
-
- public void setCompressionType(CompressionType compressionType) {
- this.compressionType = compressionType;
- }
-
- public String getAlias() {
- return alias;
- }
-
- public void setAlias(String alias) {
- this.alias = alias;
- }
-
- public boolean isAligned() {
- return isAligned;
- }
-
- public void setAligned(boolean aligned) {
- isAligned = aligned;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
new file mode 100644
index 0000000000..feb8f24eec
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+public class SchemaCacheEntry {
+
+ private final MeasurementSchema measurementSchema;
+
+ private final String alias;
+
+ private final boolean isAligned;
+
+ SchemaCacheEntry(MeasurementSchema measurementSchema, String alias, boolean isAligned) {
+ this.measurementSchema = measurementSchema;
+ this.alias = alias;
+ this.isAligned = isAligned;
+ }
+
+ public String getSchemaEntryId() {
+ return measurementSchema.getMeasurementId();
+ }
+
+ public MeasurementSchema getMeasurementSchema() {
+ return measurementSchema;
+ }
+
+ public TSDataType getTsDataType() {
+ return measurementSchema.getType();
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
index 3b438114d8..aaa1dc6e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
@@ -50,7 +50,7 @@ public class DeviceSchemaInfo {
public List<MeasurementSchema> getMeasurementSchemaList() {
return measurementNodeList.stream()
- .map(SchemaMeasurementNode::getSchema)
+ .map(measurementNode -> measurementNode == null ? null : measurementNode.getSchema())
.collect(Collectors.toList());
}
@@ -58,6 +58,9 @@ public class DeviceSchemaInfo {
return measurementNodeList.stream()
.map(
measurementNode -> {
+ if (measurementNode == null) {
+ return null;
+ }
MeasurementPath measurementPath =
new MeasurementPath(
devicePath.concatNode(measurementNode.getName()),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 05051712eb..f15ee39b68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -54,10 +54,10 @@ public class PathPatternTree {
appendPaths(devicePath, Arrays.asList(measurements));
}
- public PathPatternTree(PartialPath deivcePath, List<String> measurements) {
+ public PathPatternTree(PartialPath devicePath, List<String> measurements) {
this.root = new PathPatternNode(SQLConstant.ROOT);
this.pathList = new ArrayList<>();
- appendPaths(deivcePath, measurements);
+ appendPaths(devicePath, measurements);
}
public PathPatternTree(Map<PartialPath, List<String>> deviceToMeasurementsMap) {
@@ -251,4 +251,9 @@ public class PathPatternTree {
}
return this.getRoot().equalWith(that.getRoot());
}
+
+ public boolean isEmpty() {
+ return (root.getChildren() == null || root.getChildren().isEmpty())
+ && (pathList == null || pathList.isEmpty());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 365c5c098d..e4100e75e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.common.schematree;
-import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -40,6 +39,7 @@ import java.util.Deque;
import java.util.List;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_MATCH_PATTERN;
import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_ENTITY_NODE;
import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
@@ -71,14 +71,17 @@ public class SchemaTree {
return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
}
+ public List<MeasurementPath> getAllMeasurement() {
+ return searchMeasurementPaths(ALL_MATCH_PATTERN, 0, 0, false).left;
+ }
+
/**
* Get all device matching the path pattern.
*
* @param pathPattern the pattern of the target devices.
* @return A HashSet instance which stores info of the devices matching the given path pattern.
*/
- public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
- throws MetadataException {
+ public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) {
SchemaTreeDeviceVisitor visitor = new SchemaTreeDeviceVisitor(root, pathPattern, isPrefixMatch);
return visitor.getAllResult();
}
@@ -89,12 +92,21 @@ public class SchemaTree {
String[] nodes = devicePath.getNodes();
SchemaNode cur = root;
for (int i = 1; i < nodes.length; i++) {
+ if (cur == null) {
+ return null;
+ }
cur = cur.getChild(nodes[i]);
}
List<SchemaMeasurementNode> measurementNodeList = new ArrayList<>();
+ SchemaNode node;
for (String measurement : measurements) {
- measurementNodeList.add(cur.getChild(measurement).getAsMeasurementNode());
+ node = cur.getChild(measurement);
+ if (node == null) {
+ measurementNodeList.add(null);
+ } else {
+ measurementNodeList.add(node.getAsMeasurementNode());
+ }
}
return new DeviceSchemaInfo(devicePath, cur.getAsEntityNode().isAligned(), measurementNodeList);
@@ -107,25 +119,31 @@ public class SchemaTree {
}
private void appendSingleMeasurementPath(MeasurementPath measurementPath) {
- String[] nodes = measurementPath.getNodes();
+ appendSingleMeasurement(
+ measurementPath,
+ (MeasurementSchema) measurementPath.getMeasurementSchema(),
+ measurementPath.isMeasurementAliasExists() ? measurementPath.getMeasurementAlias() : null,
+ measurementPath.isUnderAlignedEntity());
+ }
+
+ public void appendSingleMeasurement(
+ PartialPath path, MeasurementSchema schema, String alias, boolean isAligned) {
+ String[] nodes = path.getNodes();
SchemaNode cur = root;
SchemaNode child;
for (int i = 1; i < nodes.length; i++) {
child = cur.getChild(nodes[i]);
if (child == null) {
if (i == nodes.length - 1) {
- SchemaMeasurementNode measurementNode =
- new SchemaMeasurementNode(
- nodes[i], (MeasurementSchema) measurementPath.getMeasurementSchema());
- if (measurementPath.isMeasurementAliasExists()) {
- measurementNode.setAlias(measurementPath.getMeasurementAlias());
- cur.getAsEntityNode()
- .addAliasChild(measurementPath.getMeasurementAlias(), measurementNode);
+ SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(nodes[i], schema);
+ if (alias != null) {
+ measurementNode.setAlias(alias);
+ cur.getAsEntityNode().addAliasChild(alias, measurementNode);
}
child = measurementNode;
} else if (i == nodes.length - 2) {
SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
- entityNode.setAligned(measurementPath.isUnderAlignedEntity());
+ entityNode.setAligned(isAligned);
child = entityNode;
} else {
child = new SchemaInternalNode(nodes[i]);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
index 10af366554..a237fb19e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
@@ -28,7 +28,7 @@ import java.util.Map;
public class SchemaInternalNode extends SchemaNode {
- protected Map<String, SchemaNode> children;
+ protected Map<String, SchemaNode> children = new HashMap<>();
public SchemaInternalNode(String name) {
super(name);
@@ -36,13 +36,10 @@ public class SchemaInternalNode extends SchemaNode {
@Override
public SchemaNode getChild(String name) {
- return children == null ? null : children.get(name);
+ return children.get(name);
}
public void addChild(String name, SchemaNode child) {
- if (children == null) {
- children = new HashMap<>();
- }
children.put(name, child);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index e9de9a302e..08d8257560 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -22,31 +22,48 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
public class ClusterSchemaFetcher implements ISchemaFetcher {
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final Coordinator coordinator = Coordinator.getInstance();
private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
+ private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
private static final class ClusterSchemaFetcherHolder {
private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -62,7 +79,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
@Override
public SchemaTree fetchSchema(PathPatternTree patternTree) {
- SchemaPartition schemaPartition = partitionFetcher.getSchemaPartition(patternTree);
+ return fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
+ }
+
+ private SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
schemaPartition.getSchemaPartitionMap();
List<String> storageGroups = new ArrayList<>(schemaPartitionMap.keySet());
@@ -70,6 +90,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
SchemaFetchStatement schemaFetchStatement = new SchemaFetchStatement(patternTree);
schemaFetchStatement.setSchemaPartition(schemaPartition);
+ SchemaTree result = executeSchemaFetchQuery(schemaFetchStatement);
+ result.setStorageGroups(storageGroups);
+ return result;
+ }
+
+ private SchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+
QueryId queryId =
new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
ExecutionResult executionResult =
@@ -84,7 +111,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
if (!tsBlock.isPresent()) {
break;
}
- result.setStorageGroups(storageGroups);
+
Binary binary;
SchemaTree fetchedSchemaTree;
Column column = tsBlock.get().getColumn(0);
@@ -101,22 +128,225 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
@Override
public SchemaTree fetchSchemaWithAutoCreate(
- PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
- // todo implement auto create schema
- return fetchSchema(new PathPatternTree(devicePath, measurements));
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
+
+ SchemaTree schemaTree = schemaCache.get(devicePath, measurements);
+ Pair<List<String>, List<TSDataType>> missingMeasurements =
+ checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
+
+ PathPatternTree patternTree =
+ new PathPatternTree(devicePath, missingMeasurements.left.toArray(new String[0]));
+
+ if (patternTree.isEmpty()) {
+ return schemaTree;
+ }
+
+ SchemaTree remoteSchemaTree;
+
+ if (!config.isAutoCreateSchemaEnabled()) {
+ remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
+ return schemaTree;
+ }
+
+ remoteSchemaTree =
+ fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
+
+ SchemaTree missingSchemaTree =
+ checkAndAutoCreateMissingMeasurements(
+ remoteSchemaTree,
+ devicePath,
+ missingMeasurements.left.toArray(new String[0]),
+ missingMeasurements.right.toArray(new TSDataType[0]),
+ isAligned);
+
+ schemaTree.mergeSchemaTree(missingSchemaTree);
+ schemaCache.put(missingSchemaTree);
+
+ return schemaTree;
}
@Override
public SchemaTree fetchSchemaListWithAutoCreate(
- List<PartialPath> devicePath,
- List<String[]> measurements,
- List<TSDataType[]> tsDataTypes,
- List<Boolean> aligned) {
- Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
- for (int i = 0; i < devicePath.size(); i++) {
- deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<TSDataType[]> tsDataTypesList,
+ List<Boolean> isAlignedList) {
+
+ SchemaTree schemaTree = new SchemaTree();
+ PathPatternTree patternTree = new PathPatternTree();
+ for (int i = 0; i < devicePathList.size(); i++) {
+ schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
+ patternTree.appendPaths(
+ devicePathList.get(i),
+ checkMissingMeasurements(
+ schemaTree,
+ devicePathList.get(i),
+ measurementsList.get(i),
+ tsDataTypesList.get(i))
+ .left);
+ }
+
+ if (patternTree.isEmpty()) {
+ return schemaTree;
+ }
+
+ SchemaTree remoteSchemaTree;
+
+ if (!config.isAutoCreateSchemaEnabled()) {
+ remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
+ return schemaTree;
+ }
+
+ remoteSchemaTree =
+ fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
+ schemaTree.mergeSchemaTree(remoteSchemaTree);
+ schemaCache.put(remoteSchemaTree);
+
+ SchemaTree missingSchemaTree;
+ for (int i = 0; i < devicePathList.size(); i++) {
+ missingSchemaTree =
+ checkAndAutoCreateMissingMeasurements(
+ schemaTree,
+ devicePathList.get(i),
+ measurementsList.get(i),
+ tsDataTypesList.get(i),
+ isAlignedList.get(i));
+ schemaTree.mergeSchemaTree(missingSchemaTree);
+ schemaCache.put(missingSchemaTree);
+ }
+ return schemaTree;
+ }
+
+ private SchemaTree checkAndAutoCreateMissingMeasurements(
+ SchemaTree schemaTree,
+ PartialPath devicePath,
+ String[] measurements,
+ TSDataType[] tsDataTypes,
+ boolean isAligned) {
+
+ Pair<List<String>, List<TSDataType>> checkResult =
+ checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
+
+ List<String> missingMeasurements = checkResult.left;
+ List<TSDataType> dataTypesOfMissingMeasurement = checkResult.right;
+
+ if (missingMeasurements.isEmpty()) {
+ return new SchemaTree();
+ }
+
+ internalCreateTimeseries(
+ devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned);
+
+ SchemaTree reFetchSchemaTree =
+ fetchSchema(new PathPatternTree(devicePath, missingMeasurements));
+
+ Pair<List<String>, List<TSDataType>> recheckResult =
+ checkMissingMeasurements(
+ reFetchSchemaTree,
+ devicePath,
+ missingMeasurements.toArray(new String[0]),
+ dataTypesOfMissingMeasurement.toArray(new TSDataType[0]));
+
+ missingMeasurements = recheckResult.left;
+ if (!missingMeasurements.isEmpty()) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("(");
+ for (String missingMeasurement : missingMeasurements) {
+ stringBuilder.append(missingMeasurement).append(" ");
+ }
+ stringBuilder.append(")");
+ throw new RuntimeException(
+ String.format(
+ "Failed to auto create schema, devicePath: %s, measurements: %s",
+ devicePath.getFullPath(), stringBuilder));
+ }
+
+ return reFetchSchemaTree;
+ }
+
+ private Pair<List<String>, List<TSDataType>> checkMissingMeasurements(
+ SchemaTree schemaTree,
+ PartialPath devicePath,
+ String[] measurements,
+ TSDataType[] tsDataTypes) {
+ DeviceSchemaInfo deviceSchemaInfo =
+ schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+ if (deviceSchemaInfo == null) {
+ return new Pair<>(Arrays.asList(measurements), Arrays.asList(tsDataTypes));
+ }
+
+ List<String> missingMeasurements = new ArrayList<>();
+ List<TSDataType> dataTypesOfMissingMeasurement = new ArrayList<>();
+ List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+ for (int i = 0; i < measurements.length; i++) {
+ if (schemaList.get(i) == null) {
+ missingMeasurements.add(measurements[i]);
+ dataTypesOfMissingMeasurement.add(tsDataTypes[i]);
+ }
+ }
+
+ return new Pair<>(missingMeasurements, dataTypesOfMissingMeasurement);
+ }
+
+ private void internalCreateTimeseries(
+ PartialPath devicePath,
+ List<String> measurements,
+ List<TSDataType> tsDataTypes,
+ boolean isAligned) {
+
+ if (isAligned) {
+ CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement =
+ new CreateAlignedTimeSeriesStatement();
+ createAlignedTimeSeriesStatement.setDevicePath(devicePath);
+ createAlignedTimeSeriesStatement.setMeasurements(measurements);
+ createAlignedTimeSeriesStatement.setDataTypes(tsDataTypes);
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressors = new ArrayList<>();
+ for (TSDataType dataType : tsDataTypes) {
+ encodings.add(getDefaultEncoding(dataType));
+ compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
+ }
+ createAlignedTimeSeriesStatement.setEncodings(encodings);
+ createAlignedTimeSeriesStatement.setCompressors(compressors);
+
+ executeCreateStatement(createAlignedTimeSeriesStatement);
+ } else {
+ // todo @zyk implement batch create
+ for (int i = 0; i < measurements.size(); i++) {
+ CreateTimeSeriesStatement createTimeSeriesStatement = new CreateTimeSeriesStatement();
+ createTimeSeriesStatement.setPath(devicePath.concatNode(measurements.get(i)));
+ createTimeSeriesStatement.setDataType(tsDataTypes.get(i));
+ createTimeSeriesStatement.setEncoding(getDefaultEncoding(tsDataTypes.get(i)));
+ createTimeSeriesStatement.setCompressor(
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
+ createTimeSeriesStatement.setProps(Collections.emptyMap());
+
+ executeCreateStatement(createTimeSeriesStatement);
+ }
+ }
+ }
+
+ private void executeCreateStatement(Statement statement) {
+ QueryId queryId =
+ new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
+ ExecutionResult executionResult =
+ coordinator.execute(statement, queryId, null, "", partitionFetcher, this);
+ // TODO: throw exception
+ try {
+ int statusCode = executionResult.status.getCode();
+ if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && statusCode != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+ throw new RuntimeException(
+ "cannot auto create schema, status is: " + executionResult.status);
+ }
+ } finally {
+ coordinator.getQueryExecution(queryId).stopAndCleanup();
}
- // todo implement auto create schema
- return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index facec3218f..91f9fbf375 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.metadata.cache;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
@@ -29,6 +31,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Map;
+import java.util.stream.Collectors;
public class DataNodeSchemaCacheTest {
DataNodeSchemaCache dataNodeSchemaCache;
@@ -50,46 +53,100 @@ public class DataNodeSchemaCacheTest {
measurements[0] = "s1";
measurements[1] = "s2";
measurements[2] = "s3";
- TSDataType[] tsDataTypes = new TSDataType[3];
- tsDataTypes[0] = TSDataType.INT32;
- tsDataTypes[1] = TSDataType.FLOAT;
- tsDataTypes[2] = TSDataType.BOOLEAN;
-
- Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap1 =
- dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
- device1, measurements, tsDataTypes, false);
+
+ dataNodeSchemaCache.put(generateSchemaTree1());
+
+ Map<PartialPath, SchemaCacheEntry> schemaCacheEntryMap =
+ dataNodeSchemaCache.get(device1, measurements).getAllMeasurement().stream()
+ .collect(
+ Collectors.toMap(
+ o -> new PartialPath(o.getNodes()),
+ o ->
+ new SchemaCacheEntry(
+ (MeasurementSchema) o.getMeasurementSchema(),
+ null,
+ o.isUnderAlignedEntity())));
Assert.assertEquals(
TSDataType.INT32,
- schemaCacheEntityMap1.get(new PartialPath("root.sg1.d1.s1")).getTsDataType());
+ schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTsDataType());
Assert.assertEquals(
TSDataType.FLOAT,
- schemaCacheEntityMap1.get(new PartialPath("root.sg1.d1.s2")).getTsDataType());
+ schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s2")).getTsDataType());
Assert.assertEquals(
TSDataType.BOOLEAN,
- schemaCacheEntityMap1.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
- Assert.assertEquals(3, dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+ schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
+ Assert.assertEquals(3, dataNodeSchemaCache.estimatedSize());
String[] otherMeasurements = new String[3];
otherMeasurements[0] = "s3";
otherMeasurements[1] = "s4";
otherMeasurements[2] = "s5";
- TSDataType[] otherTsDataTypes = new TSDataType[3];
- otherTsDataTypes[0] = TSDataType.BOOLEAN;
- otherTsDataTypes[1] = TSDataType.TEXT;
- otherTsDataTypes[2] = TSDataType.INT64;
-
- Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap2 =
- dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
- device1, otherMeasurements, otherTsDataTypes, false);
+
+ dataNodeSchemaCache.put(generateSchemaTree2());
+
+ schemaCacheEntryMap =
+ dataNodeSchemaCache.get(device1, otherMeasurements).getAllMeasurement().stream()
+ .collect(
+ Collectors.toMap(
+ o -> new PartialPath(o.getNodes()),
+ o ->
+ new SchemaCacheEntry(
+ (MeasurementSchema) o.getMeasurementSchema(),
+ null,
+ o.isUnderAlignedEntity())));
Assert.assertEquals(
TSDataType.BOOLEAN,
- schemaCacheEntityMap2.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
+ schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
Assert.assertEquals(
TSDataType.TEXT,
- schemaCacheEntityMap2.get(new PartialPath("root.sg1.d1.s4")).getTsDataType());
+ schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTsDataType());
Assert.assertEquals(
TSDataType.INT64,
- schemaCacheEntityMap2.get(new PartialPath("root.sg1.d1.s5")).getTsDataType());
- Assert.assertEquals(5, dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+ schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s5")).getTsDataType());
+ Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
+ }
+
+ private SchemaTree generateSchemaTree1() throws IllegalPathException {
+ SchemaTree schemaTree = new SchemaTree();
+
+ schemaTree.appendSingleMeasurement(
+ new PartialPath("root.sg1.d1.s1"),
+ new MeasurementSchema("s1", TSDataType.INT32),
+ null,
+ false);
+ schemaTree.appendSingleMeasurement(
+ new PartialPath("root.sg1.d1.s2"),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ null,
+ false);
+ schemaTree.appendSingleMeasurement(
+ new PartialPath("root.sg1.d1.s3"),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN),
+ null,
+ false);
+
+ return schemaTree;
+ }
+
+ private SchemaTree generateSchemaTree2() throws IllegalPathException {
+ SchemaTree schemaTree = new SchemaTree();
+
+ schemaTree.appendSingleMeasurement(
+ new PartialPath("root.sg1.d1.s3"),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN),
+ null,
+ false);
+ schemaTree.appendSingleMeasurement(
+ new PartialPath("root.sg1.d1.s4"),
+ new MeasurementSchema("s4", TSDataType.TEXT),
+ null,
+ false);
+ schemaTree.appendSingleMeasurement(
+ new PartialPath("root.sg1.d1.s5"),
+ new MeasurementSchema("s5", TSDataType.INT64),
+ null,
+ false);
+
+ return schemaTree;
}
}