You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/17 03:55:36 UTC

[iotdb] branch master updated: [IOTDB-3062] add last cache in DataNodeSchemaCache (#5906)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f857e562 [IOTDB-3062] add last cache in DataNodeSchemaCache (#5906)
f7f857e562 is described below

commit f7f857e56270bf4ab8177379ac74173efa3417d6
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Tue May 17 11:55:31 2022 +0800

    [IOTDB-3062] add last cache in DataNodeSchemaCache (#5906)
---
 .../metadata/cache/DataNodeLastCacheManager.java   | 81 ++++++++++++++++++++++
 .../db/metadata/cache/DataNodeSchemaCache.java     | 34 +++++++++
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  | 19 +++++
 .../db/metadata/cache/DataNodeSchemaCacheTest.java | 69 ++++++++++++++++++
 4 files changed, 203 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeLastCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeLastCacheManager.java
new file mode 100644
index 0000000000..e886f14622
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeLastCacheManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataNodeLastCacheManager {
+  private static final Logger logger = LoggerFactory.getLogger(DataNodeLastCacheManager.class);
+
+  private static final boolean CACHE_ENABLED =
+      IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
+
+  /**
+   * get the last cache value from time series
+   *
+   * @param entry schema cache entry in DataNodeSchemaCache
+   * @return the last cache value
+   */
+  public static TimeValuePair getLastCache(SchemaCacheEntry entry) {
+    if (null == entry) {
+      return null;
+    }
+    ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
+    return lastCacheContainer.getCachedLast();
+  }
+
+  /**
+   * update the last cache value of time series
+   *
+   * @param entry schema cache entry in DataNodeSchemaCache
+   * @param timeValuePair the latest point value
+   * @param highPriorityUpdate the last value from insertPlan is high priority
+   * @param latestFlushedTime latest flushed time
+   */
+  public static void updateLastCache(
+      SchemaCacheEntry entry,
+      TimeValuePair timeValuePair,
+      boolean highPriorityUpdate,
+      Long latestFlushedTime) {
+    if (null == entry) {
+      return;
+    }
+    ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
+    lastCacheContainer.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
+  }
+
+  /**
+   * reset the last cache value of time series
+   *
+   * @param entry schema cache entry in DataNodeSchemaCache
+   */
+  public static void resetLastCache(SchemaCacheEntry entry) {
+    if (null == entry) {
+      return;
+    }
+    ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
+    lastCacheContainer.resetLastCache();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 116d2af8fd..f2f6b98c3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import com.github.benmanes.caffeine.cache.Cache;
@@ -96,6 +97,38 @@ public class DataNodeSchemaCache {
     }
   }
 
+  public TimeValuePair getLastCache(PartialPath seriesPath) {
+    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+    if (null == entry) {
+      return null;
+    }
+
+    return DataNodeLastCacheManager.getLastCache(entry);
+  }
+
+  public void updateLastCache(
+      PartialPath seriesPath,
+      TimeValuePair timeValuePair,
+      boolean highPriorityUpdate,
+      Long latestFlushedTime) {
+    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+    if (null == entry) {
+      return;
+    }
+
+    DataNodeLastCacheManager.updateLastCache(
+        entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
+  }
+
+  public void resetLastCache(PartialPath seriesPath) {
+    SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+    if (null == entry) {
+      return;
+    }
+
+    DataNodeLastCacheManager.resetLastCache(entry);
+  }
+
   /**
    * For delete timeseries meatadata cache operation
    *
@@ -103,6 +136,7 @@ public class DataNodeSchemaCache {
    * @return
    */
   public void invalidate(PartialPath partialPath) {
+    resetLastCache(partialPath);
     cache.invalidate(partialPath);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index feb8f24eec..b3638ff586 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.metadata.cache;
 
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -30,6 +32,8 @@ public class SchemaCacheEntry {
 
   private final boolean isAligned;
 
+  private volatile ILastCacheContainer lastCacheContainer = null;
+
   SchemaCacheEntry(MeasurementSchema measurementSchema, String alias, boolean isAligned) {
     this.measurementSchema = measurementSchema;
     this.alias = alias;
@@ -55,4 +59,19 @@ public class SchemaCacheEntry {
   public boolean isAligned() {
     return isAligned;
   }
+
+  public ILastCacheContainer getLastCacheContainer() {
+    if (lastCacheContainer == null) {
+      synchronized (this) {
+        if (lastCacheContainer == null) {
+          lastCacheContainer = new LastCacheContainer();
+        }
+      }
+    }
+    return lastCacheContainer;
+  }
+
+  public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {
+    this.lastCacheContainer = lastCacheContainer;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 91f9fbf375..5e6160c4e8 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
@@ -106,6 +108,73 @@ public class DataNodeSchemaCacheTest {
     Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
   }
 
+  @Test
+  public void testLastCache() throws IllegalPathException {
+    // test no cache
+    PartialPath seriesPath1 = new PartialPath("root.sg1.d1.s1");
+    PartialPath seriesPath2 = new PartialPath("root.sg1.d1.s2");
+    PartialPath seriesPath3 = new PartialPath("root.sg1.d1.s3");
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+    // test no last cache
+    dataNodeSchemaCache.put(generateSchemaTree1());
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+    // put cache
+    long timestamp = 100;
+    long timestamp2 = 101;
+    TsPrimitiveType value = TsPrimitiveType.getByType(TSDataType.INT32, 101);
+    TsPrimitiveType value2 = TsPrimitiveType.getByType(TSDataType.INT32, 100);
+    TsPrimitiveType value3 = TsPrimitiveType.getByType(TSDataType.INT32, 99);
+
+    // put into last cache when cache not exist
+    TimeValuePair timeValuePair = new TimeValuePair(timestamp, value);
+    dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair, false, 99L);
+    TimeValuePair cachedTimeValuePair = dataNodeSchemaCache.getLastCache(seriesPath1);
+    Assert.assertNotNull(cachedTimeValuePair);
+    Assert.assertEquals(timestamp, cachedTimeValuePair.getTimestamp());
+    Assert.assertEquals(value, cachedTimeValuePair.getValue());
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+    // same time but low priority
+    TimeValuePair timeValuePair2 = new TimeValuePair(timestamp, value2);
+    dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair2, false, 100L);
+    TimeValuePair cachedTimeValuePair2 = dataNodeSchemaCache.getLastCache(seriesPath1);
+    Assert.assertNotNull(cachedTimeValuePair2);
+    Assert.assertEquals(timestamp, cachedTimeValuePair2.getTimestamp());
+    Assert.assertEquals(value, cachedTimeValuePair2.getValue());
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+    // same time but high priority
+    dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair2, true, 100L);
+    cachedTimeValuePair2 = dataNodeSchemaCache.getLastCache(seriesPath1);
+    Assert.assertNotNull(cachedTimeValuePair2);
+    Assert.assertEquals(timestamp, cachedTimeValuePair2.getTimestamp());
+    Assert.assertEquals(value2, cachedTimeValuePair2.getValue());
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+    // put into last cache when cache already exist
+    TimeValuePair timeValuePair3 = new TimeValuePair(timestamp2, value3);
+    dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair3, false, 100L);
+    TimeValuePair cachedTimeValuePair3 = dataNodeSchemaCache.getLastCache(seriesPath1);
+    Assert.assertNotNull(cachedTimeValuePair3);
+    Assert.assertEquals(timestamp2, cachedTimeValuePair3.getTimestamp());
+    Assert.assertEquals(value3, cachedTimeValuePair3.getValue());
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+    // invalid cache
+    dataNodeSchemaCache.invalidate(seriesPath1);
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+  }
+
   private SchemaTree generateSchemaTree1() throws IllegalPathException {
     SchemaTree schemaTree = new SchemaTree();