You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/05/10 16:50:02 UTC
[iotdb] branch master updated: Support FIFO policy for DataNodeSchemaCache Eviction (#9809)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8346ab743e Support FIFO policy for DataNodeSchemaCache Eviction (#9809)
8346ab743e is described below
commit 8346ab743e6d8254606d737ef3d676033b175789
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu May 11 00:49:54 2023 +0800
Support FIFO policy for DataNodeSchemaCache Eviction (#9809)
---
.../resources/conf/iotdb-common.properties | 4 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 27 ++-
.../db/metadata/cache/DataNodeSchemaCache.java | 2 +-
.../db/metadata/cache/TimeSeriesSchemaCache.java | 3 +-
.../dualkeycache/impl/DualKeyCacheBuilder.java | 18 +-
.../dualkeycache/impl/DualKeyCachePolicy.java | 3 +-
.../dualkeycache/impl/FIFOCacheEntryManager.java | 189 +++++++++++++++++++++
.../mtree/snapshot/MemMTreeSnapshotUtil.java | 6 +
.../cache/dualkeycache/DualKeyCacheTest.java | 17 ++
11 files changed, 273 insertions(+), 11 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 323e57d00d..690cce6e68 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -293,6 +293,10 @@ cluster_name=defaultCluster
# which exceeds this num, will be split to several plans with timeseries no more than this num.
# max_measurement_num_of_internal_request=10000
+# Policy of DataNodeSchemaCache eviction.
+# Support FIFO and LRU policy. FIFO takes low cache update overhead. LRU takes high cache hit rate.
+# datanode_schema_cache_eviction_policy=FIFO
+
####################
### Configurations for creating schema automatically
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ae8ae5c1d6..becba4918e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1021,6 +1021,9 @@ public class IoTDBConfig {
/** Memory allocated for LastCache */
private long allocateMemoryForLastCache = allocateMemoryForSchema / 10;
+ /** Policy of DataNodeSchemaCache eviction */
+ private String dataNodeSchemaCacheEvictionPolicy = "FIFO";
+
private String readConsistencyLevel = "strong";
/** Maximum execution time of a DriverTask */
@@ -3317,6 +3320,14 @@ public class IoTDBConfig {
this.allocateMemoryForLastCache = allocateMemoryForLastCache;
}
+ public String getDataNodeSchemaCacheEvictionPolicy() {
+ return dataNodeSchemaCacheEvictionPolicy;
+ }
+
+ public void setDataNodeSchemaCacheEvictionPolicy(String dataNodeSchemaCacheEvictionPolicy) {
+ this.dataNodeSchemaCacheEvictionPolicy = dataNodeSchemaCacheEvictionPolicy;
+ }
+
public String getReadConsistencyLevel() {
return readConsistencyLevel;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7b30821575..9c04d208cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1073,6 +1073,10 @@ public class IoTDBDescriptor {
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
conf.setRateLimiterType(properties.getProperty("rate_limiter_type", conf.getRateLimiterType()));
+
+ conf.setDataNodeSchemaCacheEvictionPolicy(
+ properties.getProperty(
+ "datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy()));
}
private void loadAuthorCache(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index a9be788484..91b440ab85 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -106,6 +106,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.commons.io.FileUtils;
@@ -1151,11 +1152,22 @@ public class DataRegion implements IDataRegionForQuery {
&& !node.isFromLeaderWhenUsingIoTConsensus())) {
return;
}
+ String[] measurements = node.getMeasurements();
+ MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
+ String[] rawMeasurements = new String[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurementSchemas[i] != null) {
+ // get raw measurement rather than alias
+ rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
+ } else {
+ rawMeasurements[i] = measurements[i];
+ }
+ }
DataNodeSchemaCache.getInstance()
.updateLastCache(
getDatabaseName(),
node.getDevicePath(),
- node.getMeasurements(),
+ rawMeasurements,
node.getMeasurementSchemas(),
node.isAligned(),
node::composeLastTimeValuePair,
@@ -1193,11 +1205,22 @@ public class DataRegion implements IDataRegionForQuery {
&& !node.isFromLeaderWhenUsingIoTConsensus())) {
return;
}
+ String[] measurements = node.getMeasurements();
+ MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
+ String[] rawMeasurements = new String[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurementSchemas[i] != null) {
+ // get raw measurement rather than alias
+ rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
+ } else {
+ rawMeasurements[i] = measurements[i];
+ }
+ }
DataNodeSchemaCache.getInstance()
.updateLastCache(
getDatabaseName(),
node.getDevicePath(),
- node.getMeasurements(),
+ rawMeasurements,
node.getMeasurementSchemas(),
node.isAligned(),
node::composeTimeValuePair,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index a9a981088c..62d661fd7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -113,7 +113,7 @@ public class DataNodeSchemaCache {
public ClusterSchemaTree get(PartialPath fullPath) {
ClusterSchemaTree clusterSchemaTree = deviceUsingTemplateSchemaCache.get(fullPath);
- if (clusterSchemaTree == null) {
+ if (clusterSchemaTree == null || clusterSchemaTree.isEmpty()) {
return timeSeriesSchemaCache.get(fullPath);
} else {
return clusterSchemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index d87527e2b8..755c30bd9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -55,7 +55,8 @@ public class TimeSeriesSchemaCache {
new DualKeyCacheBuilder<>();
dualKeyCache =
dualKeyCacheBuilder
- .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
+ .cacheEvictionPolicy(
+ DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
.memoryCapacity(config.getAllocateMemoryForSchemaCache())
.firstKeySizeComputer(PartialPath::estimateSize)
.secondKeySizeComputer(s -> 32 + 2 * s.length())
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
index d26cc8ac68..27b5054b04 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
@@ -32,7 +32,7 @@ import java.util.function.Function;
*/
public class DualKeyCacheBuilder<FK, SK, V> {
- private LRUCacheEntryManager<FK, SK, V> cacheEntryManager;
+ private DualKeyCachePolicy policy;
private long memoryCapacity;
@@ -44,6 +44,15 @@ public class DualKeyCacheBuilder<FK, SK, V> {
/** Initiate and return a dual key cache instance. */
public IDualKeyCache<FK, SK, V> build() {
+ ICacheEntryManager<FK, SK, V, ?> cacheEntryManager = null;
+ switch (policy) {
+ case LRU:
+ cacheEntryManager = new LRUCacheEntryManager<>();
+ break;
+ case FIFO:
+ cacheEntryManager = new FIFOCacheEntryManager<>();
+ break;
+ }
return new DualKeyCacheImpl<>(
cacheEntryManager,
new CacheSizeComputerImpl<>(firstKeySizeComputer, secondKeySizeComputer, valueSizeComputer),
@@ -52,11 +61,8 @@ public class DualKeyCacheBuilder<FK, SK, V> {
/** Define the cache eviction policy of dual key cache. */
public DualKeyCacheBuilder<FK, SK, V> cacheEvictionPolicy(DualKeyCachePolicy policy) {
- if (policy == DualKeyCachePolicy.LRU) {
- this.cacheEntryManager = new LRUCacheEntryManager<>();
- return this;
- }
- throw new IllegalStateException();
+ this.policy = policy;
+ return this;
}
/** Define the memory capacity of dual key cache. */
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
index 0211a98231..562cf69917 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
@@ -20,5 +20,6 @@
package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
public enum DualKeyCachePolicy {
- LRU;
+ LRU,
+ FIFO;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/FIFOCacheEntryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/FIFOCacheEntryManager.java
new file mode 100644
index 0000000000..00f43a5e80
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/FIFOCacheEntryManager.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FIFOCacheEntryManager<FK, SK, V>
+ implements ICacheEntryManager<FK, SK, V, FIFOCacheEntryManager.FIFOCacheEntry<SK, V>> {
+
+ private static final int SLOT_NUM = 128;
+
+ private final FIFOLinkedList[] fifoLinkedLists = new FIFOLinkedList[SLOT_NUM];
+
+ private final AtomicInteger cachePutRoundRobinIndex = new AtomicInteger(0);
+
+ private final AtomicInteger cacheEvictRoundRobinIndex = new AtomicInteger(0);
+
+ @Override
+ public FIFOCacheEntry<SK, V> createCacheEntry(
+ SK secondKey, V value, ICacheEntryGroup<FK, SK, V, FIFOCacheEntry<SK, V>> cacheEntryGroup) {
+ return new FIFOCacheEntry<>(secondKey, value, cacheEntryGroup);
+ }
+
+ @Override
+ public void access(FIFOCacheEntry<SK, V> cacheEntry) {
+ // do nothing
+ }
+
+ @Override
+ public void put(FIFOCacheEntry<SK, V> cacheEntry) {
+ getNextList(cachePutRoundRobinIndex).add(cacheEntry);
+ }
+
+ @Override
+ public FIFOCacheEntry<SK, V> evict() {
+ int startIndex = getNextIndex(cacheEvictRoundRobinIndex);
+ FIFOLinkedList fifoLinkedList;
+ FIFOCacheEntry<SK, V> cacheEntry;
+ for (int i = 0; i < SLOT_NUM; i++) {
+ if (startIndex == SLOT_NUM) {
+ startIndex = 0;
+ }
+ fifoLinkedList = fifoLinkedLists[startIndex];
+ if (fifoLinkedList != null) {
+ cacheEntry = fifoLinkedList.evict();
+ if (cacheEntry != null) {
+ return cacheEntry;
+ }
+ }
+ startIndex++;
+ }
+ return null;
+ }
+
+ @Override
+ public void cleanUp() {
+ synchronized (fifoLinkedLists) {
+ for (int i = 0; i < SLOT_NUM; i++) {
+ fifoLinkedLists[i] = null;
+ }
+ }
+ }
+
+ private FIFOLinkedList getNextList(AtomicInteger roundRobinIndex) {
+ int listIndex = getNextIndex(roundRobinIndex);
+ FIFOLinkedList fifoLinkedList = fifoLinkedLists[listIndex];
+ if (fifoLinkedList == null) {
+ synchronized (fifoLinkedLists) {
+ fifoLinkedList = fifoLinkedLists[listIndex];
+ if (fifoLinkedList == null) {
+ fifoLinkedList = new FIFOLinkedList();
+ fifoLinkedLists[listIndex] = fifoLinkedList;
+ }
+ }
+ }
+ return fifoLinkedList;
+ }
+
+ private int getNextIndex(AtomicInteger roundRobinIndex) {
+ return roundRobinIndex.getAndUpdate(
+ currentValue -> {
+ currentValue = currentValue + 1;
+ return currentValue >= SLOT_NUM ? 0 : currentValue;
+ });
+ }
+
+ static class FIFOCacheEntry<SK, V> implements ICacheEntry<SK, V> {
+
+ private final SK secondKey;
+ private final ICacheEntryGroup cacheEntryGroup;
+
+ private V value;
+
+ private FIFOCacheEntry<SK, V> pre;
+
+ private FIFOCacheEntry(SK secondKey, V value, ICacheEntryGroup cacheEntryGroup) {
+ this.secondKey = secondKey;
+ this.value = value;
+ this.cacheEntryGroup = cacheEntryGroup;
+ }
+
+ @Override
+ public SK getSecondKey() {
+ return secondKey;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public ICacheEntryGroup getBelongedGroup() {
+ return cacheEntryGroup;
+ }
+
+ @Override
+ public void replaceValue(V newValue) {
+ this.value = newValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FIFOCacheEntry<?, ?> that = (FIFOCacheEntry<?, ?>) o;
+ return Objects.equals(secondKey, that.secondKey)
+ && Objects.equals(cacheEntryGroup, that.cacheEntryGroup);
+ }
+
+ @Override
+ public int hashCode() {
+ return cacheEntryGroup.hashCode() * 31 + secondKey.hashCode();
+ }
+ }
+
+ private static class FIFOLinkedList {
+
+ private FIFOCacheEntry head;
+ private FIFOCacheEntry tail;
+
+ synchronized void add(FIFOCacheEntry cacheEntry) {
+ if (head == null) {
+ head = cacheEntry;
+ tail = cacheEntry;
+ return;
+ }
+
+ head.pre = cacheEntry;
+
+ head = cacheEntry;
+ }
+
+ synchronized FIFOCacheEntry evict() {
+ if (tail == null) {
+ return null;
+ }
+
+ FIFOCacheEntry cacheEntry = tail;
+ tail = tail.pre;
+
+ if (tail == null) {
+ head = null;
+ }
+
+ cacheEntry.pre = null;
+
+ return cacheEntry;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
index aea5f1854a..cb017d9871 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
@@ -257,6 +257,12 @@ public class MemMTreeSnapshotUtil {
if (!ancestors.isEmpty()) {
node.setParent(ancestors.peek());
ancestors.peek().addChild(node);
+ if (node.isMeasurement() && node.getAsMeasurementMNode().getAlias() != null) {
+ ancestors
+ .peek()
+ .getAsDeviceMNode()
+ .addAlias(node.getAsMeasurementMNode().getAlias(), node.getAsMeasurementMNode());
+ }
}
// Storage type means current node is root node, so it must be returned.
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
index 7184124927..557328326f 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -24,9 +24,26 @@ import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(Parameterized.class)
public class DualKeyCacheTest {
+ private final String policy;
+
+ public DualKeyCacheTest(String policy) {
+ this.policy = policy;
+ }
+
+ @Parameterized.Parameters
+ public static List<String> getTestModes() {
+ return Arrays.asList("FIFO", "LRU");
+ }
+
@Test
public void testBasicReadPut() {
DualKeyCacheBuilder<String, String, String> dualKeyCacheBuilder = new DualKeyCacheBuilder<>();