You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/25 00:48:20 UTC
[iotdb] branch master updated: Combine DataNodeSchemaCache of Template and Non-Template Scenarios (#9687)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 969995276b Combine DataNodeSchemaCache of Template and Non-Template Scenarios (#9687)
969995276b is described below
commit 969995276b0f681b45545d42e6a910ccc99f5284
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue Apr 25 08:48:14 2023 +0800
Combine DataNodeSchemaCache of Template and Non-Template Scenarios (#9687)
---
.../db/metadata/cache/DataNodeSchemaCache.java | 205 +++++++--------------
...he.java => DeviceUsingTemplateSchemaCache.java} | 45 ++---
...SchemaCache.java => TimeSeriesSchemaCache.java} | 44 +----
.../plan/analyze/schema/ClusterSchemaFetcher.java | 56 +-----
.../plan/analyze/schema/NormalSchemaFetcher.java | 5 +-
.../plan/analyze/schema/TemplateSchemaFetcher.java | 10 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 8 -
7 files changed, 96 insertions(+), 277 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 778a0ea2f8..5b5d3d5865 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -24,24 +24,21 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
-import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheComputation;
-import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCacheBuilder;
-import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
+import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
+import org.apache.iotdb.db.metadata.template.ITemplateManager;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -53,28 +50,27 @@ public class DataNodeSchemaCache {
private static final Logger logger = LoggerFactory.getLogger(DataNodeSchemaCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private final IDualKeyCache<PartialPath, String, SchemaCacheEntry> dualKeyCache;
+ private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
+
+ private final DeviceUsingTemplateSchemaCache deviceUsingTemplateSchemaCache;
+
+ private final TimeSeriesSchemaCache timeSeriesSchemaCache;
// cache update or clean have higher priority than cache read
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
private DataNodeSchemaCache() {
- DualKeyCacheBuilder<PartialPath, String, SchemaCacheEntry> dualKeyCacheBuilder =
- new DualKeyCacheBuilder<>();
- dualKeyCache =
- dualKeyCacheBuilder
- .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
- .memoryCapacity(config.getAllocateMemoryForSchemaCache())
- .firstKeySizeComputer(PartialPath::estimateSize)
- .secondKeySizeComputer(s -> 32 + 2 * s.length())
- .valueSizeComputer(SchemaCacheEntry::estimateSize)
- .build();
+ deviceUsingTemplateSchemaCache = new DeviceUsingTemplateSchemaCache(templateManager);
+ timeSeriesSchemaCache = new TimeSeriesSchemaCache();
MetricService.getInstance().addMetricSet(new DataNodeSchemaCacheMetrics(this));
}
public double getHitRate() {
- return dualKeyCache.stats().hitRate() * 100;
+ return (deviceUsingTemplateSchemaCache.getHitCount() + timeSeriesSchemaCache.getHitCount())
+ * 1.0
+ / (deviceUsingTemplateSchemaCache.getRequestCount()
+ + timeSeriesSchemaCache.getRequestCount());
}
public static DataNodeSchemaCache getInstance() {
@@ -110,110 +106,60 @@ public class DataNodeSchemaCache {
* @return timeseries partialPath and its SchemaEntity
*/
public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- Set<String> storageGroupSet = new HashSet<>();
-
- dualKeyCache.compute(
- new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
- @Override
- public PartialPath getFirstKey() {
- return devicePath;
- }
-
- @Override
- public String[] getSecondKeyList() {
- return measurements;
- }
-
- @Override
- public void computeValue(int index, SchemaCacheEntry value) {
- if (value != null) {
- schemaTree.appendSingleMeasurement(
- devicePath.concatNode(value.getSchemaEntryId()),
- value.getMeasurementSchema(),
- value.getTagMap(),
- null,
- value.isAligned());
- storageGroupSet.add(value.getStorageGroup());
- }
- }
- });
- schemaTree.setDatabases(storageGroupSet);
- return schemaTree;
+ return timeSeriesSchemaCache.get(devicePath, measurements);
}
public ClusterSchemaTree get(PartialPath fullPath) {
- SchemaCacheEntry schemaCacheEntry =
- dualKeyCache.get(fullPath.getDevicePath(), fullPath.getMeasurement());
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- if (schemaCacheEntry != null) {
- schemaTree.appendSingleMeasurement(
- fullPath,
- schemaCacheEntry.getMeasurementSchema(),
- schemaCacheEntry.getTagMap(),
- null,
- schemaCacheEntry.isAligned());
- schemaTree.setDatabases(Collections.singleton(schemaCacheEntry.getStorageGroup()));
+ ClusterSchemaTree clusterSchemaTree = deviceUsingTemplateSchemaCache.get(fullPath);
+ if (clusterSchemaTree == null) {
+ return timeSeriesSchemaCache.get(fullPath);
+ } else {
+ return clusterSchemaTree;
}
- return schemaTree;
}
- public List<Integer> compute(ISchemaComputation schemaComputation) {
- List<Integer> indexOfMissingMeasurements = new ArrayList<>();
- final AtomicBoolean isFirstMeasurement = new AtomicBoolean(true);
- dualKeyCache.compute(
- new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
- @Override
- public PartialPath getFirstKey() {
- return schemaComputation.getDevicePath();
- }
-
- @Override
- public String[] getSecondKeyList() {
- return schemaComputation.getMeasurements();
- }
-
- @Override
- public void computeValue(int index, SchemaCacheEntry value) {
- if (value == null) {
- indexOfMissingMeasurements.add(index);
- } else {
- if (isFirstMeasurement.get()) {
- schemaComputation.computeDevice(value.isAligned());
- isFirstMeasurement.getAndSet(false);
- }
- schemaComputation.computeMeasurement(index, value);
- }
- }
- });
- return indexOfMissingMeasurements;
+ public List<Integer> computeWithoutTemplate(ISchemaComputation schemaComputation) {
+ return timeSeriesSchemaCache.compute(schemaComputation);
}
- public void put(ClusterSchemaTree schemaTree) {
- for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
- putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), measurementPath);
- }
+ public List<Integer> computeWithTemplate(ISchemaComputation schemaComputation) {
+ return deviceUsingTemplateSchemaCache.compute(schemaComputation);
}
- public void putSingleMeasurementPath(String storageGroup, MeasurementPath measurementPath) {
- SchemaCacheEntry schemaCacheEntry =
- new SchemaCacheEntry(
- storageGroup,
- (MeasurementSchema) measurementPath.getMeasurementSchema(),
- measurementPath.getTagMap(),
- measurementPath.isUnderAlignedEntity());
- dualKeyCache.put(
- measurementPath.getDevicePath(), measurementPath.getMeasurement(), schemaCacheEntry);
- }
+ /**
+ * Store the fetched schema in either the schemaCache or templateSchemaCache, depending on its
+ * associated device.
+ */
+ public void put(ClusterSchemaTree tree) {
+ Optional<Pair<Template, ?>> templateInfo;
+ PartialPath devicePath;
+ Set<PartialPath> templateDevices = new HashSet<>();
+ Set<PartialPath> commonDevices = new HashSet<>();
+ for (MeasurementPath path : tree.getAllMeasurement()) {
+ devicePath = path.getDevicePath();
+ if (templateDevices.contains(devicePath)) {
+ continue;
+ }
- public TimeValuePair getLastCache(PartialPath seriesPath) {
- SchemaCacheEntry entry =
- dualKeyCache.get(seriesPath.getDevicePath(), seriesPath.getMeasurement());
- if (null == entry) {
- return null;
+ if (commonDevices.contains(devicePath)) {
+ timeSeriesSchemaCache.putSingleMeasurementPath(tree.getBelongedDatabase(path), path);
+ continue;
+ }
+
+ templateInfo = Optional.ofNullable(templateManager.checkTemplateSetInfo(devicePath));
+ if (templateInfo.isPresent()) {
+ deviceUsingTemplateSchemaCache.put(
+ devicePath, tree.getBelongedDatabase(devicePath), templateInfo.get().left.getId());
+ templateDevices.add(devicePath);
+ } else {
+ timeSeriesSchemaCache.putSingleMeasurementPath(tree.getBelongedDatabase(path), path);
+ commonDevices.add(devicePath);
+ }
}
+ }
- return DataNodeLastCacheManager.getLastCache(entry);
+ public TimeValuePair getLastCache(PartialPath seriesPath) {
+ return timeSeriesSchemaCache.getLastCache(seriesPath);
}
/** get SchemaCacheEntry and update last cache */
@@ -223,13 +169,8 @@ public class DataNodeSchemaCache {
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
- SchemaCacheEntry entry = dualKeyCache.get(devicePath, measurement);
- if (null == entry) {
- return;
- }
-
- DataNodeLastCacheManager.updateLastCache(
- entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
+ timeSeriesSchemaCache.updateLastCache(
+ devicePath, measurement, timeValuePair, highPriorityUpdate, latestFlushedTime);
}
/**
@@ -242,33 +183,17 @@ public class DataNodeSchemaCache {
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
- PartialPath seriesPath = measurementPath.transformToPartialPath();
- SchemaCacheEntry entry =
- dualKeyCache.get(seriesPath.getDevicePath(), seriesPath.getMeasurement());
- if (null == entry) {
- synchronized (dualKeyCache) {
- entry = dualKeyCache.get(seriesPath.getDevicePath(), seriesPath.getMeasurement());
- if (null == entry) {
- entry =
- new SchemaCacheEntry(
- storageGroup,
- (MeasurementSchema) measurementPath.getMeasurementSchema(),
- measurementPath.getTagMap(),
- measurementPath.isUnderAlignedEntity());
- dualKeyCache.put(seriesPath.getDevicePath(), seriesPath.getMeasurement(), entry);
- }
- }
- }
-
- DataNodeLastCacheManager.updateLastCache(
- entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
+ timeSeriesSchemaCache.updateLastCache(
+ storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime);
}
public void invalidateAll() {
- dualKeyCache.invalidateAll();
+ deviceUsingTemplateSchemaCache.invalidateCache();
+ timeSeriesSchemaCache.invalidateAll();
}
public void cleanUp() {
- dualKeyCache.cleanUp();
+ deviceUsingTemplateSchemaCache.invalidateCache();
+ timeSeriesSchemaCache.invalidateAll();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
index 52730b7dfa..f030fda35c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeTemplateSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
@@ -22,12 +22,11 @@ package org.apache.iotdb.db.metadata.cache;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputationWithAutoCreation;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -41,21 +40,19 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class DataNodeTemplateSchemaCache {
+public class DeviceUsingTemplateSchemaCache {
- private static final Logger logger = LoggerFactory.getLogger(DataNodeTemplateSchemaCache.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(DeviceUsingTemplateSchemaCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Cache<PartialPath, DeviceCacheEntry> cache;
- private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
+ private final ITemplateManager templateManager;
- // cache update due to activation or clear procedure
- private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
-
- private DataNodeTemplateSchemaCache() {
+ DeviceUsingTemplateSchemaCache(ITemplateManager templateManager) {
+ this.templateManager = templateManager;
// TODO proprietary config parameter expected
cache =
Caffeine.newBuilder()
@@ -63,32 +60,16 @@ public class DataNodeTemplateSchemaCache {
.weigher(
(Weigher<PartialPath, DeviceCacheEntry>)
(key, val) -> (PartialPath.estimateSize(key) + 32))
+ .recordStats()
.build();
}
- public static DataNodeTemplateSchemaCache getInstance() {
- return DataNodeTemplateSchemaCacheHolder.INSTANCE;
- }
-
- /** singleton pattern. */
- private static class DataNodeTemplateSchemaCacheHolder {
- private static final DataNodeTemplateSchemaCache INSTANCE = new DataNodeTemplateSchemaCache();
- }
-
- public void takeReadLock() {
- readWriteLock.readLock().lock();
- }
-
- public void releaseReadLock() {
- readWriteLock.readLock().unlock();
- }
-
- public void takeWriteLock() {
- readWriteLock.writeLock().lock();
+ public long getHitCount() {
+ return cache.stats().hitCount();
}
- public void releaseWriteLock() {
- readWriteLock.writeLock().unlock();
+ public long getRequestCount() {
+ return cache.stats().requestCount();
}
public ClusterSchemaTree get(PartialPath fullPath) {
@@ -118,7 +99,7 @@ public class DataNodeTemplateSchemaCache {
* @param computation
* @return true if conform to template cache, which means no need to fetch or create anymore
*/
- public List<Integer> conformsToTemplateCache(ISchemaComputationWithAutoCreation computation) {
+ public List<Integer> compute(ISchemaComputation computation) {
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
PartialPath devicePath = computation.getDevicePath();
String[] measurements = computation.getMeasurements();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
similarity index 87%
copy from server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index 778a0ea2f8..2fccef50d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.metadata.cache;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
@@ -42,23 +41,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-/**
- * This class takes the responsibility of metadata cache management of all DataRegions under
- * StorageEngine
- */
-public class DataNodeSchemaCache {
+public class TimeSeriesSchemaCache {
private static final Logger logger = LoggerFactory.getLogger(DataNodeSchemaCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final IDualKeyCache<PartialPath, String, SchemaCacheEntry> dualKeyCache;
- // cache update or clean have higher priority than cache read
- private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
-
- private DataNodeSchemaCache() {
+ TimeSeriesSchemaCache() {
DualKeyCacheBuilder<PartialPath, String, SchemaCacheEntry> dualKeyCacheBuilder =
new DualKeyCacheBuilder<>();
dualKeyCache =
@@ -69,37 +60,14 @@ public class DataNodeSchemaCache {
.secondKeySizeComputer(s -> 32 + 2 * s.length())
.valueSizeComputer(SchemaCacheEntry::estimateSize)
.build();
-
- MetricService.getInstance().addMetricSet(new DataNodeSchemaCacheMetrics(this));
- }
-
- public double getHitRate() {
- return dualKeyCache.stats().hitRate() * 100;
- }
-
- public static DataNodeSchemaCache getInstance() {
- return DataNodeSchemaCacheHolder.INSTANCE;
- }
-
- /** singleton pattern. */
- private static class DataNodeSchemaCacheHolder {
- private static final DataNodeSchemaCache INSTANCE = new DataNodeSchemaCache();
- }
-
- public void takeReadLock() {
- readWriteLock.readLock().lock();
- }
-
- public void releaseReadLock() {
- readWriteLock.readLock().unlock();
}
- public void takeWriteLock() {
- readWriteLock.writeLock().lock();
+ public long getHitCount() {
+ return dualKeyCache.stats().hitCount();
}
- public void releaseWriteLock() {
- readWriteLock.writeLock().unlock();
+ public long getRequestCount() {
+ return dualKeyCache.stats().requestCount();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 18a860f2d4..5bac0a831f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -18,13 +18,11 @@
*/
package org.apache.iotdb.db.mpp.plan.analyze.schema;
-import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
@@ -46,7 +44,6 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -58,8 +55,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
private final Coordinator coordinator = Coordinator.getInstance();
private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
- private final DataNodeTemplateSchemaCache templateSchemaCache =
- DataNodeTemplateSchemaCache.getInstance();
MPPQueryContext context = null;
private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
@@ -90,16 +85,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
ClusterPartitionFetcher.getInstance(),
this,
config.getQueryTimeoutThreshold()),
- this::cacheUpdater);
+ schemaCache::put);
private final NormalSchemaFetcher normalSchemaFetcher =
new NormalSchemaFetcher(schemaCache, autoCreateSchemaExecutor, clusterSchemaFetchExecutor);
private final TemplateSchemaFetcher templateSchemaFetcher =
new TemplateSchemaFetcher(
- templateManager,
- templateSchemaCache,
- autoCreateSchemaExecutor,
- clusterSchemaFetchExecutor);
+ templateManager, schemaCache, autoCreateSchemaExecutor, clusterSchemaFetchExecutor);
private static final class ClusterSchemaFetcherHolder {
private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -140,10 +132,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
ClusterSchemaTree cachedSchema;
Set<String> storageGroupSet = new HashSet<>();
for (PartialPath fullPath : fullPathList) {
- cachedSchema = templateSchemaCache.get(fullPath);
- if (cachedSchema.isEmpty()) {
- cachedSchema = schemaCache.get(fullPath);
- }
+ cachedSchema = schemaCache.get(fullPath);
if (cachedSchema.isEmpty()) {
isAllCached = false;
break;
@@ -170,45 +159,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return clusterSchemaFetchExecutor.fetchSchemaOfFuzzyMatch(patternTree, true);
}
- /**
- * Store the fetched schema in either the schemaCache or templateSchemaCache, depending on its
- * associated device.
- */
- private void cacheUpdater(ClusterSchemaTree tree) {
- Optional<Pair<Template, ?>> templateInfo;
- PartialPath devicePath;
- Set<PartialPath> templateDevices = new HashSet<>();
- Set<PartialPath> commonDevices = new HashSet<>();
- for (MeasurementPath path : tree.getAllMeasurement()) {
- devicePath = path.getDevicePath();
- if (templateDevices.contains(devicePath)) {
- continue;
- }
-
- if (commonDevices.contains(devicePath)) {
- schemaCache.putSingleMeasurementPath(tree.getBelongedDatabase(path), path);
- continue;
- }
-
- templateInfo = Optional.ofNullable(templateManager.checkTemplateSetInfo(devicePath));
- if (templateInfo.isPresent()) {
- templateSchemaCache.put(
- devicePath, tree.getBelongedDatabase(devicePath), templateInfo.get().left.getId());
- templateDevices.add(devicePath);
- } else {
- schemaCache.putSingleMeasurementPath(tree.getBelongedDatabase(path), path);
- commonDevices.add(devicePath);
- }
- }
- }
-
@Override
public void fetchAndComputeSchemaWithAutoCreate(
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
// The schema cache R/W and fetch operation must be locked together thus the cache clean
// operation executed by delete timeseries will be effective.
schemaCache.takeReadLock();
- templateSchemaCache.takeReadLock();
try {
Pair<Template, PartialPath> templateSetInfo =
templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
@@ -235,7 +191,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
} finally {
schemaCache.releaseReadLock();
- templateSchemaCache.releaseReadLock();
}
}
@@ -245,7 +200,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
// The schema cache R/W and fetch operation must be locked together thus the cache clean
// operation executed by delete timeseries will be effective.
schemaCache.takeReadLock();
- templateSchemaCache.takeReadLock();
try {
List<ISchemaComputationWithAutoCreation> normalTimeSeriesRequestList = new ArrayList<>();
@@ -273,7 +227,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
} finally {
schemaCache.releaseReadLock();
- templateSchemaCache.releaseReadLock();
}
}
@@ -429,7 +382,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
@Override
public void invalidAllCache() {
- DataNodeSchemaCache.getInstance().invalidateAll();
- DataNodeTemplateSchemaCache.getInstance().invalidateCache();
+ schemaCache.invalidateAll();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
index e162e88e60..f8f0d37f1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
@@ -50,7 +50,7 @@ class NormalSchemaFetcher {
List<Integer> processNormalTimeSeries(
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
List<Integer> indexOfMissingMeasurements =
- schemaCache.compute(schemaComputationWithAutoCreation);
+ schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
// all schema can be taken from cache
if (indexOfMissingMeasurements.isEmpty()) {
return indexOfMissingMeasurements;
@@ -98,7 +98,8 @@ class NormalSchemaFetcher {
List<Integer> indexOfMissingMeasurements;
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
- indexOfMissingMeasurements = schemaCache.compute(schemaComputationWithAutoCreation);
+ indexOfMissingMeasurements =
+ schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(i);
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java
index 3f94243861..906d848a80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/TemplateSchemaFetcher.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.alter.TemplateExtendInfo;
@@ -43,14 +43,14 @@ class TemplateSchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final ITemplateManager templateManager;
- private final DataNodeTemplateSchemaCache templateSchemaCache;
+ private final DataNodeSchemaCache templateSchemaCache;
private final AutoCreateSchemaExecutor autoCreateSchemaExecutor;
private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor;
TemplateSchemaFetcher(
ITemplateManager templateManager,
- DataNodeTemplateSchemaCache templateSchemaCache,
+ DataNodeSchemaCache templateSchemaCache,
AutoCreateSchemaExecutor autoCreateSchemaExecutor,
ClusterSchemaFetchExecutor clusterSchemaFetchExecutor) {
this.templateManager = templateManager;
@@ -80,7 +80,7 @@ class TemplateSchemaFetcher {
}
List<Integer> indexOfMissingMeasurements =
- templateSchemaCache.conformsToTemplateCache(schemaComputationWithAutoCreation);
+ templateSchemaCache.computeWithTemplate(schemaComputationWithAutoCreation);
// all schema can be taken from cache
if (indexOfMissingMeasurements.isEmpty()) {
return indexOfMissingMeasurements;
@@ -157,7 +157,7 @@ class TemplateSchemaFetcher {
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
indexOfMissingMeasurements =
- templateSchemaCache.conformsToTemplateCache(schemaComputationWithAutoCreation);
+ templateSchemaCache.computeWithTemplate(schemaComputationWithAutoCreation);
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(i);
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f29dddcac5..6a6e9e1b3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -62,7 +62,6 @@ import org.apache.iotdb.db.engine.settle.SettleRequestHandler;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.cache.DataNodeDevicePathCache;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -415,14 +414,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().takeWriteLock();
- DataNodeTemplateSchemaCache.getInstance().takeWriteLock();
try {
DataNodeSchemaCache.getInstance().invalidateAll();
- DataNodeTemplateSchemaCache.getInstance().invalidateCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
DataNodeSchemaCache.getInstance().releaseWriteLock();
- DataNodeTemplateSchemaCache.getInstance().releaseWriteLock();
}
}
@@ -488,16 +484,12 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public TSStatus invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req)
throws TException {
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
- DataNodeTemplateSchemaCache templateSchemaCache = DataNodeTemplateSchemaCache.getInstance();
cache.takeWriteLock();
- templateSchemaCache.takeWriteLock();
try {
// todo implement precise timeseries clean rather than clean all
cache.invalidateAll();
- templateSchemaCache.invalidateCache();
} finally {
cache.releaseWriteLock();
- templateSchemaCache.releaseWriteLock();
}
return RpcUtils.SUCCESS_STATUS;
}