You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/05/10 16:50:02 UTC

[iotdb] branch master updated: Support FIFO policy for DataNodeSchemaCache Eviction (#9809)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8346ab743e Support FIFO policy for DataNodeSchemaCache Eviction (#9809)
8346ab743e is described below

commit 8346ab743e6d8254606d737ef3d676033b175789
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu May 11 00:49:54 2023 +0800

    Support FIFO policy for DataNodeSchemaCache Eviction (#9809)
---
 .../resources/conf/iotdb-common.properties         |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  27 ++-
 .../db/metadata/cache/DataNodeSchemaCache.java     |   2 +-
 .../db/metadata/cache/TimeSeriesSchemaCache.java   |   3 +-
 .../dualkeycache/impl/DualKeyCacheBuilder.java     |  18 +-
 .../dualkeycache/impl/DualKeyCachePolicy.java      |   3 +-
 .../dualkeycache/impl/FIFOCacheEntryManager.java   | 189 +++++++++++++++++++++
 .../mtree/snapshot/MemMTreeSnapshotUtil.java       |   6 +
 .../cache/dualkeycache/DualKeyCacheTest.java       |  17 ++
 11 files changed, 273 insertions(+), 11 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 323e57d00d..690cce6e68 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -293,6 +293,10 @@ cluster_name=defaultCluster
 # which exceeds this num, will be split to several plans with timeseries no more than this num.
 # max_measurement_num_of_internal_request=10000
 
+# Policy of DataNodeSchemaCache eviction.
+# Support FIFO and LRU policy. FIFO takes low cache update overhead. LRU takes high cache hit rate.
+# datanode_schema_cache_eviction_policy=FIFO
+
 ####################
 ### Configurations for creating schema automatically
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ae8ae5c1d6..becba4918e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1021,6 +1021,9 @@ public class IoTDBConfig {
   /** Memory allocated for LastCache */
   private long allocateMemoryForLastCache = allocateMemoryForSchema / 10;
 
+  /** Policy of DataNodeSchemaCache eviction */
+  private String dataNodeSchemaCacheEvictionPolicy = "FIFO";
+
   private String readConsistencyLevel = "strong";
 
   /** Maximum execution time of a DriverTask */
@@ -3317,6 +3320,14 @@ public class IoTDBConfig {
     this.allocateMemoryForLastCache = allocateMemoryForLastCache;
   }
 
+  public String getDataNodeSchemaCacheEvictionPolicy() {
+    return dataNodeSchemaCacheEvictionPolicy;
+  }
+
+  public void setDataNodeSchemaCacheEvictionPolicy(String dataNodeSchemaCacheEvictionPolicy) {
+    this.dataNodeSchemaCacheEvictionPolicy = dataNodeSchemaCacheEvictionPolicy;
+  }
+
   public String getReadConsistencyLevel() {
     return readConsistencyLevel;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7b30821575..9c04d208cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1073,6 +1073,10 @@ public class IoTDBDescriptor {
     conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
 
     conf.setRateLimiterType(properties.getProperty("rate_limiter_type", conf.getRateLimiterType()));
+
+    conf.setDataNodeSchemaCacheEvictionPolicy(
+        properties.getProperty(
+            "datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy()));
   }
 
   private void loadAuthorCache(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index a9be788484..91b440ab85 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -106,6 +106,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
 import org.apache.commons.io.FileUtils;
@@ -1151,11 +1152,22 @@ public class DataRegion implements IDataRegionForQuery {
             && !node.isFromLeaderWhenUsingIoTConsensus())) {
       return;
     }
+    String[] measurements = node.getMeasurements();
+    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
+    String[] rawMeasurements = new String[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurementSchemas[i] != null) {
+        // get raw measurement rather than alias
+        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
+      } else {
+        rawMeasurements[i] = measurements[i];
+      }
+    }
     DataNodeSchemaCache.getInstance()
         .updateLastCache(
             getDatabaseName(),
             node.getDevicePath(),
-            node.getMeasurements(),
+            rawMeasurements,
             node.getMeasurementSchemas(),
             node.isAligned(),
             node::composeLastTimeValuePair,
@@ -1193,11 +1205,22 @@ public class DataRegion implements IDataRegionForQuery {
             && !node.isFromLeaderWhenUsingIoTConsensus())) {
       return;
     }
+    String[] measurements = node.getMeasurements();
+    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
+    String[] rawMeasurements = new String[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurementSchemas[i] != null) {
+        // get raw measurement rather than alias
+        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
+      } else {
+        rawMeasurements[i] = measurements[i];
+      }
+    }
     DataNodeSchemaCache.getInstance()
         .updateLastCache(
             getDatabaseName(),
             node.getDevicePath(),
-            node.getMeasurements(),
+            rawMeasurements,
             node.getMeasurementSchemas(),
             node.isAligned(),
             node::composeTimeValuePair,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index a9a981088c..62d661fd7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -113,7 +113,7 @@ public class DataNodeSchemaCache {
 
   public ClusterSchemaTree get(PartialPath fullPath) {
     ClusterSchemaTree clusterSchemaTree = deviceUsingTemplateSchemaCache.get(fullPath);
-    if (clusterSchemaTree == null) {
+    if (clusterSchemaTree == null || clusterSchemaTree.isEmpty()) {
       return timeSeriesSchemaCache.get(fullPath);
     } else {
       return clusterSchemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index d87527e2b8..755c30bd9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -55,7 +55,8 @@ public class TimeSeriesSchemaCache {
         new DualKeyCacheBuilder<>();
     dualKeyCache =
         dualKeyCacheBuilder
-            .cacheEvictionPolicy(DualKeyCachePolicy.LRU)
+            .cacheEvictionPolicy(
+                DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
             .memoryCapacity(config.getAllocateMemoryForSchemaCache())
             .firstKeySizeComputer(PartialPath::estimateSize)
             .secondKeySizeComputer(s -> 32 + 2 * s.length())
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
index d26cc8ac68..27b5054b04 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheBuilder.java
@@ -32,7 +32,7 @@ import java.util.function.Function;
  */
 public class DualKeyCacheBuilder<FK, SK, V> {
 
-  private LRUCacheEntryManager<FK, SK, V> cacheEntryManager;
+  private DualKeyCachePolicy policy;
 
   private long memoryCapacity;
 
@@ -44,6 +44,15 @@ public class DualKeyCacheBuilder<FK, SK, V> {
 
   /** Initiate and return a dual key cache instance. */
   public IDualKeyCache<FK, SK, V> build() {
+    ICacheEntryManager<FK, SK, V, ?> cacheEntryManager = null;
+    switch (policy) {
+      case LRU:
+        cacheEntryManager = new LRUCacheEntryManager<>();
+        break;
+      case FIFO:
+        cacheEntryManager = new FIFOCacheEntryManager<>();
+        break;
+    }
     return new DualKeyCacheImpl<>(
         cacheEntryManager,
         new CacheSizeComputerImpl<>(firstKeySizeComputer, secondKeySizeComputer, valueSizeComputer),
@@ -52,11 +61,8 @@ public class DualKeyCacheBuilder<FK, SK, V> {
 
   /** Define the cache eviction policy of dual key cache. */
   public DualKeyCacheBuilder<FK, SK, V> cacheEvictionPolicy(DualKeyCachePolicy policy) {
-    if (policy == DualKeyCachePolicy.LRU) {
-      this.cacheEntryManager = new LRUCacheEntryManager<>();
-      return this;
-    }
-    throw new IllegalStateException();
+    this.policy = policy;
+    return this;
   }
 
   /** Define the memory capacity of dual key cache. */
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
index 0211a98231..562cf69917 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCachePolicy.java
@@ -20,5 +20,6 @@
 package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
 
 public enum DualKeyCachePolicy {
-  LRU;
+  LRU,
+  FIFO;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/FIFOCacheEntryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/FIFOCacheEntryManager.java
new file mode 100644
index 0000000000..00f43a5e80
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/FIFOCacheEntryManager.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache.dualkeycache.impl;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FIFOCacheEntryManager<FK, SK, V>
+    implements ICacheEntryManager<FK, SK, V, FIFOCacheEntryManager.FIFOCacheEntry<SK, V>> {
+
+  private static final int SLOT_NUM = 128;
+
+  private final FIFOLinkedList[] fifoLinkedLists = new FIFOLinkedList[SLOT_NUM];
+
+  private final AtomicInteger cachePutRoundRobinIndex = new AtomicInteger(0);
+
+  private final AtomicInteger cacheEvictRoundRobinIndex = new AtomicInteger(0);
+
+  @Override
+  public FIFOCacheEntry<SK, V> createCacheEntry(
+      SK secondKey, V value, ICacheEntryGroup<FK, SK, V, FIFOCacheEntry<SK, V>> cacheEntryGroup) {
+    return new FIFOCacheEntry<>(secondKey, value, cacheEntryGroup);
+  }
+
+  @Override
+  public void access(FIFOCacheEntry<SK, V> cacheEntry) {
+    // do nothing
+  }
+
+  @Override
+  public void put(FIFOCacheEntry<SK, V> cacheEntry) {
+    getNextList(cachePutRoundRobinIndex).add(cacheEntry);
+  }
+
+  @Override
+  public FIFOCacheEntry<SK, V> evict() {
+    int startIndex = getNextIndex(cacheEvictRoundRobinIndex);
+    FIFOLinkedList fifoLinkedList;
+    FIFOCacheEntry<SK, V> cacheEntry;
+    for (int i = 0; i < SLOT_NUM; i++) {
+      if (startIndex == SLOT_NUM) {
+        startIndex = 0;
+      }
+      fifoLinkedList = fifoLinkedLists[startIndex];
+      if (fifoLinkedList != null) {
+        cacheEntry = fifoLinkedList.evict();
+        if (cacheEntry != null) {
+          return cacheEntry;
+        }
+      }
+      startIndex++;
+    }
+    return null;
+  }
+
+  @Override
+  public void cleanUp() {
+    synchronized (fifoLinkedLists) {
+      for (int i = 0; i < SLOT_NUM; i++) {
+        fifoLinkedLists[i] = null;
+      }
+    }
+  }
+
+  private FIFOLinkedList getNextList(AtomicInteger roundRobinIndex) {
+    int listIndex = getNextIndex(roundRobinIndex);
+    FIFOLinkedList fifoLinkedList = fifoLinkedLists[listIndex];
+    if (fifoLinkedList == null) {
+      synchronized (fifoLinkedLists) {
+        fifoLinkedList = fifoLinkedLists[listIndex];
+        if (fifoLinkedList == null) {
+          fifoLinkedList = new FIFOLinkedList();
+          fifoLinkedLists[listIndex] = fifoLinkedList;
+        }
+      }
+    }
+    return fifoLinkedList;
+  }
+
+  private int getNextIndex(AtomicInteger roundRobinIndex) {
+    return roundRobinIndex.getAndUpdate(
+        currentValue -> {
+          currentValue = currentValue + 1;
+          return currentValue >= SLOT_NUM ? 0 : currentValue;
+        });
+  }
+
+  static class FIFOCacheEntry<SK, V> implements ICacheEntry<SK, V> {
+
+    private final SK secondKey;
+    private final ICacheEntryGroup cacheEntryGroup;
+
+    private V value;
+
+    private FIFOCacheEntry<SK, V> pre;
+
+    private FIFOCacheEntry(SK secondKey, V value, ICacheEntryGroup cacheEntryGroup) {
+      this.secondKey = secondKey;
+      this.value = value;
+      this.cacheEntryGroup = cacheEntryGroup;
+    }
+
+    @Override
+    public SK getSecondKey() {
+      return secondKey;
+    }
+
+    @Override
+    public V getValue() {
+      return value;
+    }
+
+    @Override
+    public ICacheEntryGroup getBelongedGroup() {
+      return cacheEntryGroup;
+    }
+
+    @Override
+    public void replaceValue(V newValue) {
+      this.value = newValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      FIFOCacheEntry<?, ?> that = (FIFOCacheEntry<?, ?>) o;
+      return Objects.equals(secondKey, that.secondKey)
+          && Objects.equals(cacheEntryGroup, that.cacheEntryGroup);
+    }
+
+    @Override
+    public int hashCode() {
+      return cacheEntryGroup.hashCode() * 31 + secondKey.hashCode();
+    }
+  }
+
+  private static class FIFOLinkedList {
+
+    private FIFOCacheEntry head;
+    private FIFOCacheEntry tail;
+
+    synchronized void add(FIFOCacheEntry cacheEntry) {
+      if (head == null) {
+        head = cacheEntry;
+        tail = cacheEntry;
+        return;
+      }
+
+      head.pre = cacheEntry;
+
+      head = cacheEntry;
+    }
+
+    synchronized FIFOCacheEntry evict() {
+      if (tail == null) {
+        return null;
+      }
+
+      FIFOCacheEntry cacheEntry = tail;
+      tail = tail.pre;
+
+      if (tail == null) {
+        head = null;
+      }
+
+      cacheEntry.pre = null;
+
+      return cacheEntry;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
index aea5f1854a..cb017d9871 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
@@ -257,6 +257,12 @@ public class MemMTreeSnapshotUtil {
     if (!ancestors.isEmpty()) {
       node.setParent(ancestors.peek());
       ancestors.peek().addChild(node);
+      if (node.isMeasurement() && node.getAsMeasurementMNode().getAlias() != null) {
+        ancestors
+            .peek()
+            .getAsDeviceMNode()
+            .addAlias(node.getAsMeasurementMNode().getAlias(), node.getAsMeasurementMNode());
+      }
     }
 
     // Storage type means current node is root node, so it must be returned.
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
index 7184124927..557328326f 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java
@@ -24,9 +24,26 @@ import org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(Parameterized.class)
 public class DualKeyCacheTest {
 
+  private final String policy;
+
+  public DualKeyCacheTest(String policy) {
+    this.policy = policy;
+  }
+
+  @Parameterized.Parameters
+  public static List<String> getTestModes() {
+    return Arrays.asList("FIFO", "LRU");
+  }
+
   @Test
   public void testBasicReadPut() {
     DualKeyCacheBuilder<String, String, String> dualKeyCacheBuilder = new DualKeyCacheBuilder<>();