You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/17 03:55:36 UTC
[iotdb] branch master updated: [IOTDB-3062] add last cache in DataNodeSchemaCache (#5906)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f7f857e562 [IOTDB-3062] add last cache in DataNodeSchemaCache (#5906)
f7f857e562 is described below
commit f7f857e56270bf4ab8177379ac74173efa3417d6
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Tue May 17 11:55:31 2022 +0800
[IOTDB-3062] add last cache in DataNodeSchemaCache (#5906)
---
.../metadata/cache/DataNodeLastCacheManager.java | 81 ++++++++++++++++++++++
.../db/metadata/cache/DataNodeSchemaCache.java | 34 +++++++++
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 19 +++++
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 69 ++++++++++++++++++
4 files changed, 203 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeLastCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeLastCacheManager.java
new file mode 100644
index 0000000000..e886f14622
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeLastCacheManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataNodeLastCacheManager {
+ private static final Logger logger = LoggerFactory.getLogger(DataNodeLastCacheManager.class);
+
+ private static final boolean CACHE_ENABLED =
+ IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
+
+ /**
+ * get the last cache value from time series
+ *
+ * @param entry schema cache entry in DataNodeSchemaCache
+ * @return the last cache value
+ */
+ public static TimeValuePair getLastCache(SchemaCacheEntry entry) {
+ if (null == entry) {
+ return null;
+ }
+ ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
+ return lastCacheContainer.getCachedLast();
+ }
+
+ /**
+ * update the last cache value of time series
+ *
+ * @param entry schema cache entry in DataNodeSchemaCache
+ * @param timeValuePair the latest point value
+ * @param highPriorityUpdate the last value from insertPlan is high priority
+ * @param latestFlushedTime latest flushed time
+ */
+ public static void updateLastCache(
+ SchemaCacheEntry entry,
+ TimeValuePair timeValuePair,
+ boolean highPriorityUpdate,
+ Long latestFlushedTime) {
+ if (null == entry) {
+ return;
+ }
+ ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
+ lastCacheContainer.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
+ }
+
+ /**
+ * reset the last cache value of time series
+ *
+ * @param entry schema cache entry in DataNodeSchemaCache
+ */
+ public static void resetLastCache(SchemaCacheEntry entry) {
+ if (null == entry) {
+ return;
+ }
+ ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
+ lastCacheContainer.resetLastCache();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 116d2af8fd..f2f6b98c3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.github.benmanes.caffeine.cache.Cache;
@@ -96,6 +97,38 @@ public class DataNodeSchemaCache {
}
}
+ public TimeValuePair getLastCache(PartialPath seriesPath) {
+ SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+ if (null == entry) {
+ return null;
+ }
+
+ return DataNodeLastCacheManager.getLastCache(entry);
+ }
+
+ public void updateLastCache(
+ PartialPath seriesPath,
+ TimeValuePair timeValuePair,
+ boolean highPriorityUpdate,
+ Long latestFlushedTime) {
+ SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+ if (null == entry) {
+ return;
+ }
+
+ DataNodeLastCacheManager.updateLastCache(
+ entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
+ }
+
+ public void resetLastCache(PartialPath seriesPath) {
+ SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
+ if (null == entry) {
+ return;
+ }
+
+ DataNodeLastCacheManager.resetLastCache(entry);
+ }
+
/**
* For delete timeseries meatadata cache operation
*
@@ -103,6 +136,7 @@ public class DataNodeSchemaCache {
* @return
*/
public void invalidate(PartialPath partialPath) {
+ resetLastCache(partialPath);
cache.invalidate(partialPath);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index feb8f24eec..b3638ff586 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.metadata.cache;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -30,6 +32,8 @@ public class SchemaCacheEntry {
private final boolean isAligned;
+ private volatile ILastCacheContainer lastCacheContainer = null;
+
SchemaCacheEntry(MeasurementSchema measurementSchema, String alias, boolean isAligned) {
this.measurementSchema = measurementSchema;
this.alias = alias;
@@ -55,4 +59,19 @@ public class SchemaCacheEntry {
public boolean isAligned() {
return isAligned;
}
+
+ public ILastCacheContainer getLastCacheContainer() {
+ if (lastCacheContainer == null) {
+ synchronized (this) {
+ if (lastCacheContainer == null) {
+ lastCacheContainer = new LastCacheContainer();
+ }
+ }
+ }
+ return lastCacheContainer;
+ }
+
+ public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {
+ this.lastCacheContainer = lastCacheContainer;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 91f9fbf375..5e6160c4e8 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -106,6 +108,73 @@ public class DataNodeSchemaCacheTest {
Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
}
+ @Test
+ public void testLastCache() throws IllegalPathException {
+ // test no cache
+ PartialPath seriesPath1 = new PartialPath("root.sg1.d1.s1");
+ PartialPath seriesPath2 = new PartialPath("root.sg1.d1.s2");
+ PartialPath seriesPath3 = new PartialPath("root.sg1.d1.s3");
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+ // test no last cache
+ dataNodeSchemaCache.put(generateSchemaTree1());
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+ // put cache
+ long timestamp = 100;
+ long timestamp2 = 101;
+ TsPrimitiveType value = TsPrimitiveType.getByType(TSDataType.INT32, 101);
+ TsPrimitiveType value2 = TsPrimitiveType.getByType(TSDataType.INT32, 100);
+ TsPrimitiveType value3 = TsPrimitiveType.getByType(TSDataType.INT32, 99);
+
+ // put into last cache when cache not exist
+ TimeValuePair timeValuePair = new TimeValuePair(timestamp, value);
+ dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair, false, 99L);
+ TimeValuePair cachedTimeValuePair = dataNodeSchemaCache.getLastCache(seriesPath1);
+ Assert.assertNotNull(cachedTimeValuePair);
+ Assert.assertEquals(timestamp, cachedTimeValuePair.getTimestamp());
+ Assert.assertEquals(value, cachedTimeValuePair.getValue());
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+ // same time but low priority
+ TimeValuePair timeValuePair2 = new TimeValuePair(timestamp, value2);
+ dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair2, false, 100L);
+ TimeValuePair cachedTimeValuePair2 = dataNodeSchemaCache.getLastCache(seriesPath1);
+ Assert.assertNotNull(cachedTimeValuePair2);
+ Assert.assertEquals(timestamp, cachedTimeValuePair2.getTimestamp());
+ Assert.assertEquals(value, cachedTimeValuePair2.getValue());
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+ // same time but high priority
+ dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair2, true, 100L);
+ cachedTimeValuePair2 = dataNodeSchemaCache.getLastCache(seriesPath1);
+ Assert.assertNotNull(cachedTimeValuePair2);
+ Assert.assertEquals(timestamp, cachedTimeValuePair2.getTimestamp());
+ Assert.assertEquals(value2, cachedTimeValuePair2.getValue());
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+ // put into last cache when cache already exist
+ TimeValuePair timeValuePair3 = new TimeValuePair(timestamp2, value3);
+ dataNodeSchemaCache.updateLastCache(seriesPath1, timeValuePair3, false, 100L);
+ TimeValuePair cachedTimeValuePair3 = dataNodeSchemaCache.getLastCache(seriesPath1);
+ Assert.assertNotNull(cachedTimeValuePair3);
+ Assert.assertEquals(timestamp2, cachedTimeValuePair3.getTimestamp());
+ Assert.assertEquals(value3, cachedTimeValuePair3.getValue());
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+
+ // invalid cache
+ dataNodeSchemaCache.invalidate(seriesPath1);
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath1));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath2));
+ Assert.assertNull(dataNodeSchemaCache.getLastCache(seriesPath3));
+ }
+
private SchemaTree generateSchemaTree1() throws IllegalPathException {
SchemaTree schemaTree = new SchemaTree();