You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/24 12:14:14 UTC
[iotdb] branch master updated: [IOTDB-3184] Implement Timeseries version and blacklist (#5998)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4e3e7204d8 [IOTDB-3184] Implement Timeseries version and blacklist (#5998)
4e3e7204d8 is described below
commit 4e3e7204d8ba7fc5cc654fce93d43282af90181a
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue May 24 20:14:08 2022 +0800
[IOTDB-3184] Implement Timeseries version and blacklist (#5998)
---
.../schemaregion/rocksdb/RSchemaRegion.java | 14 +++++++++
.../rocksdb/mnode/RMeasurementMNode.java | 10 ++++++
.../db/metadata/cache/DataNodeSchemaCache.java | 17 +++-------
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 16 +++++-----
.../idtable/entry/InsertMeasurementMNode.java | 10 ++++++
.../iotdb/db/metadata/mnode/IMeasurementMNode.java | 4 +++
.../iotdb/db/metadata/mnode/MeasurementMNode.java | 12 ++++++++
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 1 +
.../iotdb/db/metadata/path/MeasurementPath.java | 25 +++++++++++++++
.../db/metadata/schemaregion/ISchemaRegion.java | 6 ++++
.../schemaregion/SchemaRegionMemoryImpl.java | 20 ++++++++++++
.../schemaregion/SchemaRegionSchemaFileImpl.java | 14 +++++++++
.../db/metadata/utils/TimeseriesVersionUtil.java | 28 +++++++++++++++++
.../metadata/visitor/SchemaExecutionVisitor.java | 5 +--
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 18 +++++++++--
.../common/schematree/node/SchemaInternalNode.java | 5 +++
.../schematree/node/SchemaMeasurementNode.java | 12 ++++++--
.../db/mpp/common/schematree/node/SchemaNode.java | 2 ++
.../visitor/SchemaTreeMeasurementVisitor.java | 8 ++---
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 8 ++---
.../iotdb/db/mpp/plan/analyze/SchemaValidator.java | 22 +++++++++++--
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 12 ++++++--
.../write/CreateAlignedTimeSeriesNode.java | 30 +++++++++++-------
.../node/metedata/write/CreateTimeSeriesNode.java | 36 +++++++++++++---------
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 26 ++++++++++------
.../db/mpp/common/schematree/SchemaTreeTest.java | 24 ++++++++++++---
.../schema/SchemaFetchScanOperatorTest.java | 9 ++++--
.../iotdb/db/service/InternalServiceImplTest.java | 10 +++++-
28 files changed, 321 insertions(+), 83 deletions(-)
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index da467999ae..a262ac2102 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -264,6 +264,13 @@ public class RSchemaRegion implements ISchemaRegion {
}
}
+ @Override
+ public void createTimeseries(CreateTimeSeriesPlan plan, long offset, String version)
+ throws MetadataException {
+ throw new UnsupportedOperationException(
+ "RSchemaRegion currently doesn't support timeseries with version");
+ }
+
@TestOnly
protected void createTimeseries(
PartialPath path,
@@ -527,6 +534,13 @@ public class RSchemaRegion implements ISchemaRegion {
}
}
+ @Override
+ public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan, List<String> versionList)
+ throws MetadataException {
+ throw new UnsupportedOperationException(
+ "RSchemaRegion currently doesn't support timeseries with version");
+ }
+
private void createEntityRecursively(String[] nodes, int start, int end, boolean aligned)
throws RocksDBException, MetadataException, InterruptedException {
if (start <= end) {
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
index eab9ba710a..3729362136 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
@@ -150,6 +150,16 @@ public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
throw new UnsupportedOperationException();
}
+ @Override
+ public String getVersion() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setVersion(String version) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void serializeTo(MLogWriter logWriter) throws IOException {
throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index f2f6b98c3c..d75f7f3173 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -71,8 +71,9 @@ public class DataNodeSchemaCache {
devicePath.concatNode(
schemaCacheEntry.getSchemaEntryId()), // the cached path may be alias path
schemaCacheEntry.getMeasurementSchema(),
- schemaCacheEntry.getAlias(),
- schemaCacheEntry.isAligned());
+ null,
+ schemaCacheEntry.isAligned(),
+ schemaCacheEntry.getVersion());
}
}
return schemaTree;
@@ -83,17 +84,9 @@ public class DataNodeSchemaCache {
SchemaCacheEntry schemaCacheEntry =
new SchemaCacheEntry(
(MeasurementSchema) measurementPath.getMeasurementSchema(),
- measurementPath.isMeasurementAliasExists()
- ? measurementPath.getMeasurementAlias()
- : null,
- measurementPath.isUnderAlignedEntity());
+ measurementPath.isUnderAlignedEntity(),
+ measurementPath.getVersion());
cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
- if (measurementPath.isMeasurementAliasExists()) {
- // cache alias path
- cache.put(
- measurementPath.getDevicePath().concatNode(measurementPath.getMeasurementAlias()),
- schemaCacheEntry);
- }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index b3638ff586..d86468e2f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -28,16 +28,16 @@ public class SchemaCacheEntry {
private final MeasurementSchema measurementSchema;
- private final String alias;
-
private final boolean isAligned;
+ private final String version;
+
private volatile ILastCacheContainer lastCacheContainer = null;
- SchemaCacheEntry(MeasurementSchema measurementSchema, String alias, boolean isAligned) {
+ SchemaCacheEntry(MeasurementSchema measurementSchema, boolean isAligned, String version) {
this.measurementSchema = measurementSchema;
- this.alias = alias;
this.isAligned = isAligned;
+ this.version = version;
}
public String getSchemaEntryId() {
@@ -52,14 +52,14 @@ public class SchemaCacheEntry {
return measurementSchema.getType();
}
- public String getAlias() {
- return alias;
- }
-
public boolean isAligned() {
return isAligned;
}
+ public String getVersion() {
+ return version;
+ }
+
public ILastCacheContainer getLastCacheContainer() {
if (lastCacheContainer == null) {
synchronized (this) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/InsertMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/InsertMeasurementMNode.java
index dfc48e03fc..33ae4dc40d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/InsertMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/InsertMeasurementMNode.java
@@ -96,6 +96,16 @@ public class InsertMeasurementMNode implements IMeasurementMNode {
@Override
public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {}
+ @Override
+ public String getVersion() {
+ throw new UnsupportedOperationException("insert measurement mnode doesn't support this method");
+ }
+
+ @Override
+ public void setVersion(String version) {
+ throw new UnsupportedOperationException("insert measurement mnode doesn't support this method");
+ }
+
@Override
public IMeasurementSchema getSchema() {
return schema;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
index fca21a9cbd..c6d1efb351 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
@@ -51,4 +51,8 @@ public interface IMeasurementMNode extends IMNode {
ILastCacheContainer getLastCacheContainer();
void setLastCacheContainer(ILastCacheContainer lastCacheContainer);
+
+ String getVersion();
+
+ void setVersion(String version);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
index 3909f0a471..13cab78399 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
@@ -48,6 +48,8 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
/** last value cache */
private volatile ILastCacheContainer lastCacheContainer = null;
+ private String version = null;
+
/**
* MeasurementMNode factory method. The type of returned MeasurementMNode is according to the
* schema type. The default type is UnaryMeasurementMNode, which means if schema == null, an
@@ -148,6 +150,16 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
this.lastCacheContainer = lastCacheContainer;
}
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
@Override
public void serializeTo(MLogWriter logWriter) throws IOException {
logWriter.serializeMeasurementMNode(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index b3dc1b18da..1b900c23c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -610,6 +610,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
// only when user query with alias, the alias in path will be set
path.setMeasurementAlias(node.getAlias());
}
+ path.setVersion(node.getVersion());
result.add(path);
}
};
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 7b9cd9a3c0..e0f4fd1dba 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class MeasurementPath extends PartialPath {
@@ -44,6 +45,8 @@ public class MeasurementPath extends PartialPath {
// alias of measurement, null pointer cannot be serialized in thrift so empty string is instead
private String measurementAlias = "";
+ private String version = null;
+
public MeasurementPath() {}
public MeasurementPath(String measurementPath) throws IllegalPathException {
@@ -123,6 +126,14 @@ public class MeasurementPath extends PartialPath {
isUnderAlignedEntity = underAlignedEntity;
}
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
@Override
public PartialPath copy() {
MeasurementPath result = new MeasurementPath();
@@ -135,6 +146,15 @@ public class MeasurementPath extends PartialPath {
return result;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ MeasurementPath that = (MeasurementPath) o;
+ return Objects.equals(version, that.version);
+ }
+
/**
* if isUnderAlignedEntity is true, return an AlignedPath with only one sub sensor otherwise,
* return itself
@@ -173,6 +193,8 @@ public class MeasurementPath extends PartialPath {
}
ReadWriteIOUtils.write(isUnderAlignedEntity, byteBuffer);
ReadWriteIOUtils.write(measurementAlias, byteBuffer);
+
+ ReadWriteIOUtils.write(version, byteBuffer);
}
public static MeasurementPath deserialize(ByteBuffer byteBuffer) {
@@ -189,6 +211,9 @@ public class MeasurementPath extends PartialPath {
}
measurementPath.isUnderAlignedEntity = ReadWriteIOUtils.readBool(byteBuffer);
measurementPath.measurementAlias = ReadWriteIOUtils.readString(byteBuffer);
+
+ measurementPath.version = ReadWriteIOUtils.readString(byteBuffer);
+
measurementPath.nodes = partialPath.getNodes();
measurementPath.device = partialPath.getDevice();
measurementPath.fullPath = partialPath.getFullPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 4ec058a0c0..4f9cb3f045 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -100,8 +100,14 @@ public interface ISchemaRegion {
// region Interfaces for Timeseries operation
void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException;
+ void createTimeseries(CreateTimeSeriesPlan plan, long offset, String version)
+ throws MetadataException;
+
void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException;
+ void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan, List<String> versionList)
+ throws MetadataException;
+
/**
* Delete all timeseries matching the given path pattern. If using prefix match, the path pattern
* is used to match prefix path. All timeseries start with the matched prefix path will be
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 84815a9383..33a1fc7c5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -497,6 +497,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+ createTimeseries(plan, offset, null);
+ }
+
+ @Override
+ public void createTimeseries(CreateTimeSeriesPlan plan, long offset, String version)
+ throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
@@ -516,6 +522,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
plan.getProps(),
plan.getAlias());
+ leafMNode.setVersion(version);
+
// the cached mNode may be replaced by new entityMNode in mtree
mNodeCache.invalidate(path.getDevicePath());
@@ -607,6 +615,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
* @param plan CreateAlignedTimeSeriesPlan
*/
public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+ createAlignedTimeSeries(plan, null);
+ }
+
+ @Override
+ public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan, List<String> versionList)
+ throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
@@ -633,6 +647,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
plan.getCompressors(),
plan.getAliasList());
+ if (versionList != null) {
+ for (int i = 0; i < measurements.size(); i++) {
+ measurementMNodeList.get(i).setVersion(versionList.get(i));
+ }
+ }
+
// the cached mNode may be replaced by new entityMNode in mtree
mNodeCache.invalidate(prefixPath);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 86ec1229f0..63e0e61423 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -538,6 +538,13 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
}
}
+ @Override
+ public void createTimeseries(CreateTimeSeriesPlan plan, long offset, String version)
+ throws MetadataException {
+ throw new UnsupportedOperationException(
+ "SchemaRegion schema file mode currently doesn't support timeseries with version");
+ }
+
/**
* Add one timeseries to metadata tree, if the timeseries already exists, throw exception
*
@@ -700,6 +707,13 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
}
}
+ @Override
+ public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan, List<String> versionList)
+ throws MetadataException {
+ throw new UnsupportedOperationException(
+ "SchemaRegion schema file mode currently doesn't support timeseries with version");
+ }
+
/**
* Delete all timeseries matching the given path pattern. If using prefix match, the path pattern
* is used to match prefix path. All timeseries start with the matched prefix path will be
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/TimeseriesVersionUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/TimeseriesVersionUtil.java
new file mode 100644
index 0000000000..6516fff12f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/TimeseriesVersionUtil.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.utils;
+
+import java.util.UUID;
+
+public class TimeseriesVersionUtil {
+
+ public static String generateVersion() {
+ return UUID.randomUUID().toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index b6b55c731b..f0f05e0523 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -54,7 +54,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
- schemaRegion.createTimeseries((CreateTimeSeriesPlan) plan, -1);
+ schemaRegion.createTimeseries((CreateTimeSeriesPlan) plan, -1, node.getVersion());
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
@@ -67,7 +67,8 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion>
CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
- schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+ schemaRegion.createAlignedTimeSeries(
+ (CreateAlignedTimeSeriesPlan) plan, node.getVersionList());
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index a1ee32b437..f2d7bcb8c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -140,11 +140,12 @@ public class SchemaTree {
measurementPath,
(MeasurementSchema) measurementPath.getMeasurementSchema(),
measurementPath.isMeasurementAliasExists() ? measurementPath.getMeasurementAlias() : null,
- measurementPath.isUnderAlignedEntity());
+ measurementPath.isUnderAlignedEntity(),
+ measurementPath.getVersion());
}
public void appendSingleMeasurement(
- PartialPath path, MeasurementSchema schema, String alias, boolean isAligned) {
+ PartialPath path, MeasurementSchema schema, String alias, boolean isAligned, String version) {
String[] nodes = path.getNodes();
SchemaNode cur = root;
SchemaNode child;
@@ -152,7 +153,8 @@ public class SchemaTree {
child = cur.getChild(nodes[i]);
if (child == null) {
if (i == nodes.length - 1) {
- SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(nodes[i], schema);
+ SchemaMeasurementNode measurementNode =
+ new SchemaMeasurementNode(nodes[i], schema, version);
if (alias != null) {
measurementNode.setAlias(alias);
cur.getAsEntityNode().addAliasChild(alias, measurementNode);
@@ -175,6 +177,16 @@ public class SchemaTree {
}
}
+ public void pruneSingleMeasurement(PartialPath path) {
+ String[] nodes = path.getNodes();
+ SchemaNode cur = root;
+ for (int i = 1; i < nodes.length - 1; i++) {
+ cur = cur.getChild(nodes[i]);
+ }
+
+ cur.removeChild(nodes[nodes.length - 1]);
+ }
+
public void mergeSchemaTree(SchemaTree schemaTree) {
traverseAndMerge(this.root, null, schemaTree.root);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
index a237fb19e1..b2d74ac839 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
@@ -43,6 +43,11 @@ public class SchemaInternalNode extends SchemaNode {
children.put(name, child);
}
+ @Override
+ public void removeChild(String name) {
+ children.remove(name);
+ }
+
@Override
public void replaceChild(String name, SchemaNode newChild) {
SchemaNode oldChild = children.get(name);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
index ea9a5d1f19..846a455e4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
@@ -28,10 +28,12 @@ public class SchemaMeasurementNode extends SchemaNode {
private String alias;
private MeasurementSchema schema;
+ private String version;
- public SchemaMeasurementNode(String name, MeasurementSchema schema) {
+ public SchemaMeasurementNode(String name, MeasurementSchema schema, String version) {
super(name);
this.schema = schema;
+ this.version = version;
}
public String getAlias() {
@@ -66,6 +68,10 @@ public class SchemaMeasurementNode extends SchemaNode {
this.schema = schema;
}
+ public String getVersion() {
+ return version;
+ }
+
@Override
public boolean isMeasurement() {
return true;
@@ -88,14 +94,16 @@ public class SchemaMeasurementNode extends SchemaNode {
ReadWriteIOUtils.write(alias, buffer);
schema.serializeTo(buffer);
+ ReadWriteIOUtils.write(version, buffer);
}
public static SchemaMeasurementNode deserialize(ByteBuffer buffer) {
String name = ReadWriteIOUtils.readString(buffer);
String alias = ReadWriteIOUtils.readString(buffer);
MeasurementSchema schema = MeasurementSchema.deserializeFrom(buffer);
+ String version = ReadWriteIOUtils.readString(buffer);
- SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(name, schema);
+ SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(name, schema, version);
measurementNode.setAlias(alias);
return measurementNode;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaNode.java
index 99e76b0a62..77e3884a7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaNode.java
@@ -48,6 +48,8 @@ public abstract class SchemaNode implements ITreeNode {
public void addChild(String name, SchemaNode child) {}
+ public void removeChild(String name) {}
+
public abstract void replaceChild(String name, SchemaNode newChild);
public abstract void copyDataTo(SchemaNode schemaNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java
index f86a22ccb3..a130420797 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java
@@ -70,15 +70,15 @@ public class SchemaTreeMeasurementVisitor extends SchemaTreeVisitor<MeasurementP
@Override
protected MeasurementPath generateResult() {
+ SchemaMeasurementNode measurementNode = nextMatchedNode.getAsMeasurementNode();
MeasurementPath result =
- new MeasurementPath(
- generateFullPathNodes(nextMatchedNode),
- nextMatchedNode.getAsMeasurementNode().getSchema());
+ new MeasurementPath(generateFullPathNodes(measurementNode), measurementNode.getSchema());
result.setUnderAlignedEntity(ancestorStack.peek().getNode().getAsEntityNode().isAligned());
- String alias = nextMatchedNode.getAsMeasurementNode().getAlias();
+ String alias = measurementNode.getAlias();
if (nodes[nodes.length - 1].equals(alias)) {
result.setMeasurementAlias(alias);
}
+ result.setVersion(measurementNode.getVersion());
return result;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index e2dfe00b0c..31cc03f51b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -61,13 +61,13 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
root.addChild("sg", sg);
SchemaMeasurementNode s1 =
- new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
+ new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32), null);
SchemaMeasurementNode s2 =
- new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE));
+ new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE), null);
SchemaMeasurementNode s3 =
- new SchemaMeasurementNode("s3", new MeasurementSchema("s3", TSDataType.BOOLEAN));
+ new SchemaMeasurementNode("s3", new MeasurementSchema("s3", TSDataType.BOOLEAN), null);
SchemaMeasurementNode s4 =
- new SchemaMeasurementNode("s4", new MeasurementSchema("s4", TSDataType.TEXT));
+ new SchemaMeasurementNode("s4", new MeasurementSchema("s4", TSDataType.TEXT), null);
s2.setAlias("status");
SchemaEntityNode d1 = new SchemaEntityNode("d1");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index c0f5b5f412..d3c4cdf3d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -20,13 +20,21 @@
package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class SchemaValidator {
- private static final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+ private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+
+ private static final Set<MeasurementPath> BLACKLIST =
+ Collections.synchronizedSet(new HashSet<>());
public static SchemaTree validate(InsertNode insertNode) {
@@ -34,20 +42,28 @@ public class SchemaValidator {
if (insertNode instanceof BatchInsertNode) {
BatchInsertNode batchInsertNode = (BatchInsertNode) insertNode;
schemaTree =
- schemaFetcher.fetchSchemaListWithAutoCreate(
+ SCHEMA_FETCHER.fetchSchemaListWithAutoCreate(
batchInsertNode.getDevicePaths(),
batchInsertNode.getMeasurementsList(),
batchInsertNode.getDataTypesList(),
batchInsertNode.getAlignedList());
} else {
schemaTree =
- schemaFetcher.fetchSchemaWithAutoCreate(
+ SCHEMA_FETCHER.fetchSchemaWithAutoCreate(
insertNode.getDevicePath(),
insertNode.getMeasurements(),
insertNode.getDataTypes(),
insertNode.isAligned());
}
+ if (!BLACKLIST.isEmpty()) {
+ for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
+ if (BLACKLIST.contains(measurementPath)) {
+ schemaTree.pruneSingleMeasurement(measurementPath);
+ }
+ }
+ }
+
if (!insertNode.validateAndSetSchema(schemaTree)) {
throw new SemanticException("Data type mismatch");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 4de3c11ab5..4a7be524f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
+import org.apache.iotdb.db.metadata.utils.TimeseriesVersionUtil;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
@@ -274,13 +275,19 @@ public class LogicalPlanner {
createTimeSeriesStatement.getProps(),
createTimeSeriesStatement.getTags(),
createTimeSeriesStatement.getAttributes(),
- createTimeSeriesStatement.getAlias());
+ createTimeSeriesStatement.getAlias(),
+ TimeseriesVersionUtil.generateVersion());
}
@Override
public PlanNode visitCreateAlignedTimeseries(
CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement,
MPPQueryContext context) {
+ int size = createAlignedTimeSeriesStatement.getMeasurements().size();
+ List<String> versionList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ versionList.add(TimeseriesVersionUtil.generateVersion());
+ }
return new CreateAlignedTimeSeriesNode(
context.getQueryId().genPlanNodeId(),
createAlignedTimeSeriesStatement.getDevicePath(),
@@ -290,7 +297,8 @@ public class LogicalPlanner {
createAlignedTimeSeriesStatement.getCompressors(),
createAlignedTimeSeriesStatement.getAliasList(),
createAlignedTimeSeriesStatement.getTagsList(),
- createAlignedTimeSeriesStatement.getAttributesList());
+ createAlignedTimeSeriesStatement.getAttributesList(),
+ versionList);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 19ee14da62..20ba9b8813 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -51,7 +51,9 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
private List<String> aliasList;
private List<Map<String, String>> tagsList;
private List<Map<String, String>> attributesList;
- private List<Long> tagOffsets;
+
+ private List<String> versionList;
+
private TRegionReplicaSet regionReplicaSet;
public CreateAlignedTimeSeriesNode(
@@ -63,7 +65,8 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
List<CompressionType> compressors,
List<String> aliasList,
List<Map<String, String>> tagsList,
- List<Map<String, String>> attributesList) {
+ List<Map<String, String>> attributesList,
+ List<String> versionList) {
super(id);
this.devicePath = devicePath;
this.measurements = measurements;
@@ -73,6 +76,7 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
this.aliasList = aliasList;
this.tagsList = tagsList;
this.attributesList = attributesList;
+ this.versionList = versionList;
}
public PartialPath getDevicePath() {
@@ -139,12 +143,8 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
this.attributesList = attributesList;
}
- public List<Long> getTagOffsets() {
- return tagOffsets;
- }
-
- public void setTagOffsets(List<Long> tagOffsets) {
- this.tagOffsets = tagOffsets;
+ public List<String> getVersionList() {
+ return versionList;
}
@Override
@@ -246,6 +246,11 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
}
}
+ List<String> versionList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ versionList.add(ReadWriteIOUtils.readString(byteBuffer));
+ }
+
id = ReadWriteIOUtils.readString(byteBuffer);
return new CreateAlignedTimeSeriesNode(
@@ -257,7 +262,8 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
compressors,
aliasList,
tagsList,
- attributesList);
+ attributesList,
+ versionList);
}
@Override
@@ -275,7 +281,6 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
&& Objects.equals(dataTypes, that.dataTypes)
&& Objects.equals(encodings, that.encodings)
&& Objects.equals(compressors, that.compressors)
- && Objects.equals(tagOffsets, that.tagOffsets)
&& Objects.equals(aliasList, that.aliasList)
&& Objects.equals(tagsList, that.tagsList)
&& Objects.equals(attributesList, that.attributesList);
@@ -344,6 +349,10 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
ReadWriteIOUtils.write(attributes, byteBuffer);
}
}
+
+ for (String version : versionList) {
+ ReadWriteIOUtils.write(version, byteBuffer);
+ }
}
public int hashCode() {
@@ -354,7 +363,6 @@ public class CreateAlignedTimeSeriesNode extends WritePlanNode {
dataTypes,
encodings,
compressors,
- tagOffsets,
aliasList,
tagsList,
attributesList);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 9a0c170b91..dbff07a15b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -52,7 +52,8 @@ public class CreateTimeSeriesNode extends WritePlanNode {
private Map<String, String> props = null;
private Map<String, String> tags = null;
private Map<String, String> attributes = null;
- private long tagOffset = -1;
+
+ private String version;
private TRegionReplicaSet regionReplicaSet;
@@ -65,7 +66,8 @@ public class CreateTimeSeriesNode extends WritePlanNode {
Map<String, String> props,
Map<String, String> tags,
Map<String, String> attributes,
- String alias) {
+ String alias,
+ String version) {
super(id);
this.path = path;
this.dataType = dataType;
@@ -78,6 +80,7 @@ public class CreateTimeSeriesNode extends WritePlanNode {
this.props = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.props.putAll(props);
}
+ this.version = version;
}
public PartialPath getPath() {
@@ -144,12 +147,8 @@ public class CreateTimeSeriesNode extends WritePlanNode {
this.props = props;
}
- public long getTagOffset() {
- return tagOffset;
- }
-
- public void setTagOffset(long tagOffset) {
- this.tagOffset = tagOffset;
+ public String getVersion() {
+ return version;
}
@Override
@@ -181,7 +180,6 @@ public class CreateTimeSeriesNode extends WritePlanNode {
TSDataType dataType;
TSEncoding encoding;
CompressionType compressor;
- long tagOffset;
String alias = null;
Map<String, String> props = null;
Map<String, String> tags = null;
@@ -198,7 +196,6 @@ public class CreateTimeSeriesNode extends WritePlanNode {
dataType = TSDataType.values()[byteBuffer.get()];
encoding = TSEncoding.values()[byteBuffer.get()];
compressor = CompressionType.values()[byteBuffer.get()];
- tagOffset = byteBuffer.getLong();
// alias
if (byteBuffer.get() == 1) {
@@ -229,9 +226,20 @@ public class CreateTimeSeriesNode extends WritePlanNode {
attributes = ReadWriteIOUtils.readMap(byteBuffer);
}
+ String version = ReadWriteIOUtils.readString(byteBuffer);
+
id = ReadWriteIOUtils.readString(byteBuffer);
return new CreateTimeSeriesNode(
- new PlanNodeId(id), path, dataType, encoding, compressor, props, tags, attributes, alias);
+ new PlanNodeId(id),
+ path,
+ dataType,
+ encoding,
+ compressor,
+ props,
+ tags,
+ attributes,
+ alias,
+ version);
}
@Override
@@ -244,7 +252,6 @@ public class CreateTimeSeriesNode extends WritePlanNode {
byteBuffer.put((byte) dataType.ordinal());
byteBuffer.put((byte) encoding.ordinal());
byteBuffer.put((byte) compressor.ordinal());
- byteBuffer.putLong(tagOffset);
// alias
if (alias != null) {
@@ -283,6 +290,8 @@ public class CreateTimeSeriesNode extends WritePlanNode {
byteBuffer.put((byte) 1);
ReadWriteIOUtils.write(attributes, byteBuffer);
}
+
+ ReadWriteIOUtils.write(version, byteBuffer);
}
@Override
@@ -301,8 +310,7 @@ public class CreateTimeSeriesNode extends WritePlanNode {
return false;
}
CreateTimeSeriesNode that = (CreateTimeSeriesNode) o;
- return tagOffset == that.tagOffset
- && path.equals(that.path)
+ return path.equals(that.path)
&& dataType == that.dataType
&& encoding == that.encoding
&& compressor == that.compressor
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 5e6160c4e8..1c304ce8fb 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -66,8 +66,8 @@ public class DataNodeSchemaCacheTest {
o ->
new SchemaCacheEntry(
(MeasurementSchema) o.getMeasurementSchema(),
- null,
- o.isUnderAlignedEntity())));
+ o.isUnderAlignedEntity(),
+ null)));
Assert.assertEquals(
TSDataType.INT32,
schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTsDataType());
@@ -94,8 +94,8 @@ public class DataNodeSchemaCacheTest {
o ->
new SchemaCacheEntry(
(MeasurementSchema) o.getMeasurementSchema(),
- null,
- o.isUnderAlignedEntity())));
+ o.isUnderAlignedEntity(),
+ null)));
Assert.assertEquals(
TSDataType.BOOLEAN,
schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
@@ -182,17 +182,20 @@ public class DataNodeSchemaCacheTest {
new PartialPath("root.sg1.d1.s1"),
new MeasurementSchema("s1", TSDataType.INT32),
null,
- false);
+ false,
+ null);
schemaTree.appendSingleMeasurement(
new PartialPath("root.sg1.d1.s2"),
new MeasurementSchema("s2", TSDataType.FLOAT),
null,
- false);
+ false,
+ null);
schemaTree.appendSingleMeasurement(
new PartialPath("root.sg1.d1.s3"),
new MeasurementSchema("s3", TSDataType.BOOLEAN),
null,
- false);
+ false,
+ null);
return schemaTree;
}
@@ -204,17 +207,20 @@ public class DataNodeSchemaCacheTest {
new PartialPath("root.sg1.d1.s3"),
new MeasurementSchema("s3", TSDataType.BOOLEAN),
null,
- false);
+ false,
+ null);
schemaTree.appendSingleMeasurement(
new PartialPath("root.sg1.d1.s4"),
new MeasurementSchema("s4", TSDataType.TEXT),
null,
- false);
+ false,
+ null);
schemaTree.appendSingleMeasurement(
new PartialPath("root.sg1.d1.s5"),
new MeasurementSchema("s5", TSDataType.INT64),
null,
- false);
+ false,
+ null);
return schemaTree;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTreeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTreeTest.java
index 23a5446c91..cd1956ed1c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTreeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTreeTest.java
@@ -275,9 +275,9 @@ public class SchemaTreeTest {
MeasurementSchema schema1 = new MeasurementSchema("s1", TSDataType.INT32);
MeasurementSchema schema2 = new MeasurementSchema("s2", TSDataType.INT64);
- SchemaMeasurementNode s1 = new SchemaMeasurementNode("s1", schema1);
+ SchemaMeasurementNode s1 = new SchemaMeasurementNode("s1", schema1, null);
d1.addChild("s1", s1);
- SchemaMeasurementNode s2 = new SchemaMeasurementNode("s2", schema2);
+ SchemaMeasurementNode s2 = new SchemaMeasurementNode("s2", schema2, null);
s2.setAlias("status");
d1.addChild("s2", s2);
d1.addAliasChild("status", s2);
@@ -313,7 +313,7 @@ public class SchemaTreeTest {
SchemaNode s;
for (int i = 0; i < 5; i++) {
a = new SchemaEntityNode("a");
- s = new SchemaMeasurementNode("s", schema);
+ s = new SchemaMeasurementNode("s", schema, null);
a.addChild("s", s);
parent.addChild("a", a);
parent = a;
@@ -333,7 +333,7 @@ public class SchemaTreeTest {
for (int i = 0; i < 2; i++) {
c = new SchemaEntityNode("c");
- c.addChild("s1", new SchemaMeasurementNode("s1", schema));
+ c.addChild("s1", new SchemaMeasurementNode("s1", schema, null));
parent.addChild("c", c);
parent = c;
}
@@ -527,6 +527,22 @@ public class SchemaTreeTest {
return measurementPathList;
}
+ @Test
+ public void testPruneMeasurement() throws Exception {
+ SchemaNode root = generateSchemaTree();
+ SchemaTree schemaTree = new SchemaTree(root);
+
+ Pair<List<MeasurementPath>, Integer> result =
+ schemaTree.searchMeasurementPaths(new PartialPath("root.sg.d1.s2"));
+ Assert.assertEquals(1, result.left.size());
+ Assert.assertEquals("root.sg.d1.s2", result.left.get(0).getFullPath());
+
+ schemaTree.pruneSingleMeasurement(new PartialPath("root.sg.d1.s2"));
+
+ result = schemaTree.searchMeasurementPaths(new PartialPath("root.sg.d1.s2"));
+ Assert.assertTrue(result.left.isEmpty());
+ }
+
@Test
public void testMergeSchemaTree() throws Exception {
SchemaTree schemaTree = new SchemaTree();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
index 4b204bdd70..14b12c02f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
@@ -101,6 +101,9 @@ public class SchemaFetchScanOperatorTest {
Assert.assertEquals(
Arrays.asList("root.sg.d1.s2", "root.sg.d2.a.s2", "root.sg.d2.s2"),
pair.left.stream().map(MeasurementPath::getFullPath).collect(Collectors.toList()));
+ Assert.assertEquals(
+ Arrays.asList("0", "2", "1"),
+ pair.left.stream().map(MeasurementPath::getVersion).collect(Collectors.toList()));
}
private ISchemaRegion prepareSchemaRegion() throws Exception {
@@ -126,10 +129,10 @@ public class SchemaFetchScanOperatorTest {
createTimeSeriesPlan.setAlias("status");
createTimeSeriesPlan.setPath(new PartialPath("root.sg.d1.s2"));
- schemaRegion.createTimeseries(createTimeSeriesPlan, -1);
+ schemaRegion.createTimeseries(createTimeSeriesPlan, -1, "0");
createTimeSeriesPlan.setPath(new PartialPath("root.sg.d2.s2"));
- schemaRegion.createTimeseries(createTimeSeriesPlan, -1);
+ schemaRegion.createTimeseries(createTimeSeriesPlan, -1, "1");
CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
new CreateAlignedTimeSeriesPlan(
@@ -142,7 +145,7 @@ public class SchemaFetchScanOperatorTest {
Collections.emptyList(),
Collections.emptyList());
- schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+ schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan, Arrays.asList(null, "2"));
return schemaRegion;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 3fafea7cfd..c8a7c972ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.ConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
+import org.apache.iotdb.db.metadata.utils.TimeseriesVersionUtil;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -133,7 +134,8 @@ public class InternalServiceImplTest {
put("attr2", "a2");
}
},
- "meter1");
+ "meter1",
+ TimeseriesVersionUtil.generateVersion());
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
@@ -221,6 +223,12 @@ public class InternalServiceImplTest {
});
add(null);
}
+ },
+ new ArrayList<String>() {
+ {
+ add(TimeseriesVersionUtil.generateVersion());
+ add(TimeseriesVersionUtil.generateVersion());
+ }
});
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();