You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/19 01:18:53 UTC
[iotdb] branch master updated: [IOTDB-4815] Apply SchemaCache for explicit timeseries query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bf49535e88 [IOTDB-4815] Apply SchemaCache for explicit timeseries query
bf49535e88 is described below
commit bf49535e88090cf9471fbda5ea22f499604974b7
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Sat Nov 19 09:18:46 2022 +0800
[IOTDB-4815] Apply SchemaCache for explicit timeseries query
---
.../db/metadata/cache/DataNodeSchemaCache.java | 47 +++++--
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 12 +-
.../mpp/common/schematree/ClusterSchemaTree.java | 23 ++--
.../common/schematree/DeviceGroupSchemaTree.java | 7 +-
.../db/mpp/common/schematree/ISchemaTree.java | 7 +-
.../execution/executor/RegionWriteExecutor.java | 11 +-
.../process/last/UpdateLastCacheOperator.java | 15 ++-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 9 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 149 +++++++++++++++------
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 2 +-
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 7 +-
.../mpp/execution/operator/OperatorMemoryTest.java | 2 +-
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 2 +-
.../iotdb/db/mpp/plan/plan/distribution/Util.java | 2 +-
14 files changed, 210 insertions(+), 85 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index bec73aeda7..c5614e34eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -34,6 +34,9 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -100,34 +103,60 @@ public class DataNodeSchemaCache {
*/
public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ Set<String> storageGroupSet = new HashSet<>();
SchemaCacheEntry schemaCacheEntry;
for (String measurement : measurements) {
PartialPath path = devicePath.concatNode(measurement);
schemaCacheEntry = cache.getIfPresent(path);
if (schemaCacheEntry != null) {
schemaTree.appendSingleMeasurement(
- devicePath.concatNode(
- schemaCacheEntry.getSchemaEntryId()), // the cached path may be alias path
+ devicePath.concatNode(schemaCacheEntry.getSchemaEntryId()),
schemaCacheEntry.getMeasurementSchema(),
schemaCacheEntry.getTagMap(),
null,
schemaCacheEntry.isAligned());
+ storageGroupSet.add(schemaCacheEntry.getStorageGroup());
}
}
+ schemaTree.setDatabases(storageGroupSet);
+ return schemaTree;
+ }
+
+ public ClusterSchemaTree get(PartialPath fullPath) {
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ SchemaCacheEntry schemaCacheEntry = cache.getIfPresent(fullPath);
+ if (schemaCacheEntry != null) {
+ schemaTree.appendSingleMeasurement(
+ fullPath,
+ schemaCacheEntry.getMeasurementSchema(),
+ schemaCacheEntry.getTagMap(),
+ null,
+ schemaCacheEntry.isAligned());
+ schemaTree.setDatabases(Collections.singleton(schemaCacheEntry.getStorageGroup()));
+ }
return schemaTree;
}
public void put(ISchemaTree schemaTree) {
for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
- SchemaCacheEntry schemaCacheEntry =
- new SchemaCacheEntry(
- (MeasurementSchema) measurementPath.getMeasurementSchema(),
- measurementPath.getTagMap(),
- measurementPath.isUnderAlignedEntity());
- cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+ putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), measurementPath);
}
}
+ public void put(String storageGroup, MeasurementPath measurementPath) {
+ putSingleMeasurementPath(storageGroup, measurementPath);
+ }
+
+ private void putSingleMeasurementPath(String storageGroup, MeasurementPath measurementPath) {
+ SchemaCacheEntry schemaCacheEntry =
+ new SchemaCacheEntry(
+ storageGroup,
+ (MeasurementSchema) measurementPath.getMeasurementSchema(),
+ measurementPath.getTagMap(),
+ measurementPath.isUnderAlignedEntity());
+ cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+ }
+
public TimeValuePair getLastCache(PartialPath seriesPath) {
SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
if (null == entry) {
@@ -157,6 +186,7 @@ public class DataNodeSchemaCache {
* aligned sensor without only one sub sensor
*/
public void updateLastCache(
+ String storageGroup,
MeasurementPath measurementPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
@@ -169,6 +199,7 @@ public class DataNodeSchemaCache {
if (null == entry) {
entry =
new SchemaCacheEntry(
+ storageGroup,
(MeasurementSchema) measurementPath.getMeasurementSchema(),
measurementPath.getTagMap(),
measurementPath.isUnderAlignedEntity());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index 9b0fa929d7..66c49e3dc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -28,6 +28,8 @@ import java.util.Map;
public class SchemaCacheEntry {
+ private final String storageGroup;
+
private final MeasurementSchema measurementSchema;
private final Map<String, String> tagMap;
@@ -36,7 +38,11 @@ public class SchemaCacheEntry {
private volatile ILastCacheContainer lastCacheContainer = null;
SchemaCacheEntry(
- MeasurementSchema measurementSchema, Map<String, String> tagMap, boolean isAligned) {
+ String storageGroup,
+ MeasurementSchema measurementSchema,
+ Map<String, String> tagMap,
+ boolean isAligned) {
+ this.storageGroup = storageGroup.intern();
this.measurementSchema = measurementSchema;
this.isAligned = isAligned;
this.tagMap = tagMap;
@@ -46,6 +52,10 @@ public class SchemaCacheEntry {
return measurementSchema.getMeasurementId();
}
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
public MeasurementSchema getMeasurementSchema() {
return measurementSchema;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 1e52edb485..d1581176ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_MATCH_PATTERN;
@@ -50,7 +51,7 @@ import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_M
public class ClusterSchemaTree implements ISchemaTree {
- private List<String> storageGroups;
+ private Set<String> databases;
private final SchemaNode root;
@@ -292,27 +293,27 @@ public class ClusterSchemaTree implements ISchemaTree {
* @return database in the given path
*/
@Override
- public String getBelongedStorageGroup(String pathName) {
- for (String storageGroup : storageGroups) {
- if (PathUtils.isStartWith(pathName, storageGroup)) {
- return storageGroup;
+ public String getBelongedDatabase(String pathName) {
+ for (String database : databases) {
+ if (PathUtils.isStartWith(pathName, database)) {
+ return database;
}
}
throw new RuntimeException("No matched database. Please check the path " + pathName);
}
@Override
- public String getBelongedStorageGroup(PartialPath path) {
- return getBelongedStorageGroup(path.getFullPath());
+ public String getBelongedDatabase(PartialPath path) {
+ return getBelongedDatabase(path.getFullPath());
}
@Override
- public List<String> getStorageGroups() {
- return storageGroups;
+ public Set<String> getDatabases() {
+ return databases;
}
- public void setStorageGroups(List<String> storageGroups) {
- this.storageGroups = storageGroups;
+ public void setDatabases(Set<String> databases) {
+ this.databases = databases;
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
index 1c739ea9fe..a8a64f38f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* This class is specifically for standalone schema validation during data insertion. Since the
@@ -82,17 +83,17 @@ public class DeviceGroupSchemaTree implements ISchemaTree {
}
@Override
- public String getBelongedStorageGroup(String pathName) {
+ public String getBelongedDatabase(String pathName) {
throw new UnsupportedOperationException();
}
@Override
- public String getBelongedStorageGroup(PartialPath path) {
+ public String getBelongedDatabase(PartialPath path) {
throw new UnsupportedOperationException();
}
@Override
- public List<String> getStorageGroups() {
+ public Set<String> getDatabases() {
throw new UnsupportedOperationException();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java
index acd383286a..010443a63a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
+import java.util.Set;
public interface ISchemaTree {
/**
@@ -60,11 +61,11 @@ public interface ISchemaTree {
* @param pathName only full path, cannot be path pattern
* @return database in the given path
*/
- String getBelongedStorageGroup(String pathName);
+ String getBelongedDatabase(String pathName);
- String getBelongedStorageGroup(PartialPath path);
+ String getBelongedDatabase(PartialPath path);
- List<String> getStorageGroups();
+ Set<String> getDatabases();
boolean isEmpty();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index e63309b26c..0d1d7804c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
@@ -423,7 +425,10 @@ public class RegionWriteExecutor {
failingMeasurement.getValue().getMessage());
alreadyExistingStatus.add(
RpcUtils.getStatus(
- metadataException.getErrorCode(), metadataException.getMessage()));
+ metadataException.getErrorCode(),
+ MeasurementPath.transformDataToString(
+ ((MeasurementAlreadyExistException) metadataException)
+ .getMeasurementPath())));
} else {
LOGGER.error("Metadata error: ", metadataException);
failingStatus.add(
@@ -469,9 +474,9 @@ public class RegionWriteExecutor {
TSStatus status;
if (failingStatus.isEmpty()) {
- status = RpcUtils.getStatus(failingStatus);
- } else {
status = RpcUtils.getStatus(alreadyExistingStatus);
+ } else {
+ status = RpcUtils.getStatus(failingStatus);
}
RegionExecutionResult result = new RegionExecutionResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index abd43eb5e0..8afd9b99de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
@@ -58,6 +59,8 @@ public class UpdateLastCacheOperator implements ProcessOperator {
private final TsBlockBuilder tsBlockBuilder;
+ private String databaseName;
+
public UpdateLastCacheOperator(
OperatorContext operatorContext,
Operator child,
@@ -106,7 +109,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
if (needUpdateCache) {
TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
- lastCache.updateLastCache(fullPath, timeValuePair, false, Long.MIN_VALUE);
+ lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
}
tsBlockBuilder.reset();
@@ -117,6 +120,16 @@ public class UpdateLastCacheOperator implements ProcessOperator {
return tsBlockBuilder.build();
}
+ private String getDatabaseName() {
+ if (databaseName == null) {
+ databaseName =
+ ((DataDriverContext) operatorContext.getInstanceContext().getDriverContext())
+ .getDataRegion()
+ .getStorageGroupName();
+ }
+ return databaseName;
+ }
+
@Override
public boolean hasNext() {
return child.hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 7028955335..19169af3b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -359,7 +359,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
- .computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+ .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -1104,7 +1104,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
- .computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+ .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -1914,8 +1914,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
- .computeIfAbsent(
- schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+ .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
.add(queryParam);
}
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -2152,7 +2151,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
queryParam.setDevicePath(devicePath.getFullPath());
sgNameToQueryParamsMap
.computeIfAbsent(
- schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+ schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
.add(queryParam);
});
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 31cc8c8deb..16a49ae47a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -106,10 +106,69 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
Map<Integer, Template> templateMap = new HashMap<>();
patternTree.constructTree();
- for (PartialPath pattern : patternTree.getAllPathPatterns()) {
+ List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+ for (PartialPath pattern : pathPatternList) {
templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
}
- return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+
+ if (withTags) {
+ return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+ }
+
+ List<PartialPath> fullPathList = new ArrayList<>();
+ for (PartialPath pattern : pathPatternList) {
+ if (!pattern.hasWildcard()) {
+ fullPathList.add(pattern);
+ }
+ }
+
+ if (fullPathList.isEmpty()) {
+ return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+ }
+
+ // The schema cache R/W and fetch operation must be locked together thus the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ try {
+ ClusterSchemaTree schemaTree;
+ if (fullPathList.size() == pathPatternList.size()) {
+ boolean isAllCached = true;
+ schemaTree = new ClusterSchemaTree();
+ ClusterSchemaTree cachedSchema;
+ Set<String> storageGroupSet = new HashSet<>();
+ for (PartialPath fullPath : fullPathList) {
+ cachedSchema = schemaCache.get(fullPath);
+ if (cachedSchema.isEmpty()) {
+ isAllCached = false;
+ break;
+ } else {
+ schemaTree.mergeSchemaTree(cachedSchema);
+ storageGroupSet.addAll(cachedSchema.getDatabases());
+ }
+ }
+ if (isAllCached) {
+ schemaTree.setDatabases(storageGroupSet);
+ return schemaTree;
+ }
+ }
+
+ schemaTree =
+ executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+
+ // only cache the schema fetched by full path
+ List<MeasurementPath> measurementPathList;
+ for (PartialPath fullPath : fullPathList) {
+ measurementPathList = schemaTree.searchMeasurementPaths(fullPath).left;
+ if (measurementPathList.isEmpty()) {
+ continue;
+ }
+ schemaCache.put(
+ schemaTree.getBelongedDatabase(measurementPathList.get(0)), measurementPathList.get(0));
+ }
+ return schemaTree;
+ } finally {
+ schemaCache.releaseReadLock();
+ }
}
private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
@@ -132,7 +191,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
ClusterSchemaTree result = new ClusterSchemaTree();
- List<String> storageGroupList = new ArrayList<>();
+ Set<String> databaseSet = new HashSet<>();
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
// The query will be transited to FINISHED when invoking getBatchResult() at the last time
// So we don't need to clean up it manually
@@ -147,10 +206,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
Column column = tsBlock.get().getColumn(0);
for (int i = 0; i < column.getPositionCount(); i++) {
- parseFetchedData(column.getBinary(i), result, storageGroupList);
+ parseFetchedData(column.getBinary(i), result, databaseSet);
}
}
- result.setStorageGroups(storageGroupList);
+ result.setDatabases(databaseSet);
return result;
}
} finally {
@@ -159,14 +218,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
private void parseFetchedData(
- Binary data, ClusterSchemaTree resultSchemaTree, List<String> storageGroupList) {
+ Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
InputStream inputStream = new ByteArrayInputStream(data.getValues());
try {
byte type = ReadWriteIOUtils.readByte(inputStream);
if (type == 0) {
int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; i++) {
- storageGroupList.add(ReadWriteIOUtils.readString(inputStream));
+ databaseSet.add(ReadWriteIOUtils.readString(inputStream));
}
} else if (type == 1) {
resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
@@ -185,21 +244,24 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
String[] measurements,
Function<Integer, TSDataType> getDataType,
boolean isAligned) {
+ // The schema cache R/W and fetch operation must be locked together thus the cache clean
+ // operation executed by delete timeseries will be effective.
schemaCache.takeReadLock();
try {
ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
List<Integer> indexOfMissingMeasurements =
checkMissingMeasurements(schemaTree, devicePath, measurements);
+ // all schema can be taken from cache
if (indexOfMissingMeasurements.isEmpty()) {
return schemaTree;
}
+ // try fetch the missing schema from remote and cache fetched schema
PathPatternTree patternTree = new PathPatternTree();
for (int index : indexOfMissingMeasurements) {
patternTree.appendFullPath(devicePath, measurements[index]);
}
-
ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
if (!remoteSchemaTree.isEmpty()) {
schemaTree.mergeSchemaTree(remoteSchemaTree);
@@ -210,19 +272,16 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return schemaTree;
}
- ClusterSchemaTree missingSchemaTree =
- checkAndAutoCreateMissingMeasurements(
- remoteSchemaTree,
- devicePath,
- indexOfMissingMeasurements,
- measurements,
- getDataType,
- null,
- null,
- isAligned);
-
- schemaTree.mergeSchemaTree(missingSchemaTree);
- schemaCache.put(missingSchemaTree);
+ // auto create the still missing schema and merge them into schemaTree
+ checkAndAutoCreateMissingMeasurements(
+ schemaTree,
+ devicePath,
+ indexOfMissingMeasurements,
+ measurements,
+ getDataType,
+ null,
+ null,
+ isAligned);
return schemaTree;
} finally {
@@ -248,8 +307,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
List<TSEncoding[]> encodingsList,
List<CompressionType[]> compressionTypesList,
List<Boolean> isAlignedList) {
+ // The schema cache R/W and fetch operation must be locked together thus the cache clean
+ // operation executed by delete timeseries will be effective.
schemaCache.takeReadLock();
-
try {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
PathPatternTree patternTree = new PathPatternTree();
@@ -264,10 +324,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
}
+ // all schema can be taken from cache
if (patternTree.isEmpty()) {
return schemaTree;
}
+ // try fetch the missing schema from remote and cache fetched schema
ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
if (!remoteSchemaTree.isEmpty()) {
schemaTree.mergeSchemaTree(remoteSchemaTree);
@@ -278,21 +340,18 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return schemaTree;
}
- ClusterSchemaTree missingSchemaTree;
+ // auto create the still missing schema and merge them into schemaTree
for (int i = 0; i < devicePathList.size(); i++) {
int finalI = i;
- missingSchemaTree =
- checkAndAutoCreateMissingMeasurements(
- schemaTree,
- devicePathList.get(i),
- indexOfMissingMeasurementsList.get(i),
- measurementsList.get(i),
- index -> tsDataTypesList.get(finalI)[index],
- encodingsList == null ? null : encodingsList.get(i),
- compressionTypesList == null ? null : compressionTypesList.get(i),
- isAlignedList.get(i));
- schemaTree.mergeSchemaTree(missingSchemaTree);
- schemaCache.put(missingSchemaTree);
+ checkAndAutoCreateMissingMeasurements(
+ schemaTree,
+ devicePathList.get(i),
+ indexOfMissingMeasurementsList.get(i),
+ measurementsList.get(i),
+ index -> tsDataTypesList.get(finalI)[index],
+ encodingsList == null ? null : encodingsList.get(i),
+ compressionTypesList == null ? null : compressionTypesList.get(i),
+ isAlignedList.get(i));
}
return schemaTree;
} finally {
@@ -315,7 +374,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return templateManager.getAllPathsSetTemplate(templateName);
}
- private ClusterSchemaTree checkAndAutoCreateMissingMeasurements(
+ // check which measurements are missing and auto create the missing measurements and merge them
+ // into given schemaTree
+ private void checkAndAutoCreateMissingMeasurements(
ClusterSchemaTree schemaTree,
PartialPath devicePath,
List<Integer> indexOfMissingMeasurements,
@@ -324,6 +385,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean isAligned) {
+ // check missing measurements
DeviceSchemaInfo deviceSchemaInfo =
schemaTree.searchDeviceSchemaInfo(
devicePath,
@@ -340,13 +402,11 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
}
}
-
- ClusterSchemaTree reFetchedSchemaTree = new ClusterSchemaTree();
-
if (indexOfMissingMeasurements.isEmpty()) {
- return reFetchedSchemaTree;
+ return;
}
+ // check whether there is template should be activated
Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
if (templateInfo != null) {
Template template = templateInfo.left;
@@ -377,11 +437,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
if (indexOfMissingMeasurements.isEmpty()) {
- return schemaTree;
+ return;
}
}
}
+ // auto create the rest missing timeseries
List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
List<TSDataType> dataTypesOfMissingMeasurement =
new ArrayList<>(indexOfMissingMeasurements.size());
@@ -416,7 +477,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
compressionTypesOfMissingMeasurement,
isAligned));
}
- return schemaTree;
}
private List<Integer> checkMissingMeasurements(
@@ -438,6 +498,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return indexOfMissingMeasurements;
}
+ // try to create the target timeseries and return schemaTree involving successfully created
+ // timeseries and existing timeseries
private ClusterSchemaTree internalCreateTimeseries(
PartialPath devicePath,
List<String> measurements,
@@ -472,11 +534,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
isAligned);
}
- schemaCache.put(schemaTree);
-
return schemaTree;
}
+ // auto create timeseries and return the existing timeseries info
private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
InternalCreateTimeSeriesStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index 72f10eaa87..f2ab109b79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -88,7 +88,7 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
} catch (MetadataException e) {
throw new RuntimeException(e);
}
- schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+ schemaTree.setDatabases(storageGroupSet);
return schemaTree;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index a581cdd867..bee7c16e6b 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -70,6 +71,7 @@ public class DataNodeSchemaCacheTest {
o -> new PartialPath(o.getNodes()),
o ->
new SchemaCacheEntry(
+ "root.sg1",
(MeasurementSchema) o.getMeasurementSchema(),
o.getTagMap(),
o.isUnderAlignedEntity())));
@@ -102,6 +104,7 @@ public class DataNodeSchemaCacheTest {
o -> new PartialPath(o.getNodes()),
o ->
new SchemaCacheEntry(
+ "root.sg1",
(MeasurementSchema) o.getMeasurementSchema(),
o.getTagMap(),
o.isUnderAlignedEntity())));
@@ -209,7 +212,7 @@ public class DataNodeSchemaCacheTest {
null,
null,
false);
-
+ schemaTree.setDatabases(Collections.singleton("root.sg1"));
return schemaTree;
}
@@ -234,7 +237,7 @@ public class DataNodeSchemaCacheTest {
null,
null,
false);
-
+ schemaTree.setDatabases(Collections.singleton("root.sg1"));
return schemaTree;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index c1d7728804..9a3f34b3a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -492,7 +492,7 @@ public class OperatorMemoryTest {
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
UpdateLastCacheOperator updateLastCacheOperator =
- new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
+ new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, false);
assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 1a121354ca..61951feee4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -45,7 +45,7 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
@Override
public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
- schemaTree.setStorageGroups(Collections.singletonList("root.sg"));
+ schemaTree.setDatabases(Collections.singleton("root.sg"));
return schemaTree;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index e97d6340cd..654e987931 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -275,7 +275,7 @@ public class Util {
d6.addChild("s2", s2);
ClusterSchemaTree tree = new ClusterSchemaTree(root);
- tree.setStorageGroups(Collections.singletonList("root.sg"));
+ tree.setDatabases(Collections.singleton("root.sg"));
return tree;
}