You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/07/27 01:03:26 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 880438fc57 [To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
880438fc57 is described below
commit 880438fc579ad03453c009d5d0036fea2c5eb810
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Jul 27 09:03:20 2022 +0800
[To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
[To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
---
.../apache/iotdb/db/integration/IoTDBTtlIT.java | 20 ++++++++++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 1 +
.../org/apache/iotdb/db/metadata/MManager.java | 10 ++++++
.../db/metadata/lastCache/LastCacheManager.java | 37 +++++++++++++++++++++-
.../org/apache/iotdb/db/metadata/mtree/MTree.java | 32 +++++++++++++++++++
.../iotdb/db/query/executor/LastQueryExecutor.java | 1 +
.../iotdb/db/metadata/MManagerAdvancedTest.java | 35 ++++++++++++++++++++
7 files changed, 135 insertions(+), 1 deletion(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
index 1f279296a9..ede0442975 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneTest;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -220,4 +221,23 @@ public class IoTDBTtlIT {
|| result.equals("root.group2.sgroup1 10000\n" + "root.group1,null\n"));
}
}
+
+ @Test
+ public void testTTLOnLastCache() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.TTL_SG1");
+ statement.execute("CREATE TIMESERIES root.TTL_SG1.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+
+ long now = System.currentTimeMillis();
+ statement.execute(
+ String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", now - 110, 1));
+
+ statement.execute("SET TTL TO root.TTL_SG1 100");
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT last s1 FROM root.TTL_SG1")) {
+ Assert.assertFalse(resultSet.next());
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7bc4b22e18..034773b55a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -395,6 +395,7 @@ public class StorageEngine implements IService {
private void checkTTL() {
try {
+ IoTDB.metaManager.checkTTLOnLastCache();
for (StorageGroupManager processor : processorMap.values()) {
processor.checkTTL();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 12d89f8bba..2bb80ce64c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1947,6 +1947,16 @@ public class MManager {
node.getAsEntityMNode(), originalPath, startTime, endTime);
}
}
+
+ public void checkTTLOnLastCache() {
+ try {
+ mtree.processMNodeDuringTraversal(
+ new PartialPath(new String[] {"root", "**"}), LastCacheManager::checkTTLOnLastCache);
+ } catch (MetadataException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
// endregion
// region Interfaces and Implementation for InsertPlan process
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
index 117f5cec07..820b97c74a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
@@ -61,7 +61,19 @@ public class LastCacheManager {
checkIsTemplateLastCacheAndSetIfAbsent(node);
ILastCacheContainer lastCacheContainer = node.getLastCacheContainer();
- return lastCacheContainer.getCachedLast();
+ TimeValuePair result = lastCacheContainer.getCachedLast();
+ long ttl = getTTL(node);
+ return result == null || ttl == -1 || result.getTimestamp() < System.currentTimeMillis() - ttl
+ ? null
+ : result;
+ }
+
+ private static long getTTL(IMeasurementMNode node) {
+ IMNode ancestor = node;
+ while (ancestor != null && !ancestor.isStorageGroup()) {
+ ancestor = ancestor.getParent();
+ }
+ return ancestor == null ? -1 : ancestor.getAsStorageGroupMNode().getDataTTL();
}
/**
@@ -250,4 +262,27 @@ public class LastCacheManager {
}
}
}
+
+ public static void checkTTLOnLastCache(IMNode node, long dataTTL) {
+ if (node.isMeasurement()) {
+ processTTLOnLastCacheContainer(node.getAsMeasurementMNode().getLastCacheContainer(), dataTTL);
+ return;
+ }
+
+ if (node.isEntity()) {
+ Map<String, ILastCacheContainer> containerMap =
+ node.getAsEntityMNode().getTemplateLastCaches();
+ for (ILastCacheContainer container : containerMap.values()) {
+ processTTLOnLastCacheContainer(container, dataTTL);
+ }
+ }
+ }
+
+ private static void processTTLOnLastCacheContainer(ILastCacheContainer container, long dataTTL) {
+ TimeValuePair timeValuePair = container.getCachedLast();
+ if (timeValuePair != null
+ && timeValuePair.getTimestamp() < System.currentTimeMillis() - dataTTL) {
+ container.resetLastCache();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
index 4e74440488..eb38dd6cc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.MNodeUtils;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.traverser.Traverser;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.CollectorTraverser;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.EntityCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MNodeCollector;
@@ -104,6 +105,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
@@ -1969,4 +1971,34 @@ public class MTree implements Serializable {
return res;
}
// endregion
+
+ public void processMNodeDuringTraversal(
+ PartialPath pathPattern, BiConsumer<IMNode, Long> consumer) throws MetadataException {
+ Traverser traverser =
+ new Traverser(root, pathPattern) {
+
+ private long dataTTL = -1;
+
+ @Override
+ protected boolean processInternalMatchedMNode(IMNode node, int idx, int level)
+ throws MetadataException {
+ return processMNode(node);
+ }
+
+ @Override
+ protected boolean processFullMatchedMNode(IMNode node, int idx, int level)
+ throws MetadataException {
+ return processMNode(node);
+ }
+
+ private boolean processMNode(IMNode node) {
+ if (node.isStorageGroup()) {
+ dataTTL = node.getAsStorageGroupMNode().getDataTTL();
+ }
+ consumer.accept(node, dataTTL);
+ return node.isMeasurement();
+ }
+ };
+ traverser.traverse();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index ef28a6dfaf..ed9b7f938e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -199,6 +199,7 @@ public class LastQueryExecutor {
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(nonCachedPaths.get(i), context, filter, ascending);
+ filter = dataSource.updateFilterUsingTTL(filter);
LastPointReader lastReader =
nonCachedPaths
.get(i)
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
index 344bb4804a..250c428d4d 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
@@ -37,6 +37,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -225,4 +226,38 @@ public class MManagerAdvancedTest {
LastCacheManager.updateLastCache(node, tv3, true, Long.MIN_VALUE);
Assert.assertEquals(tv2.getTimestamp(), mmanager.getLastCache(node).getTimestamp());
}
+
+ @Test
+ public void testLastCacheWithTTL() throws MetadataException, IOException {
+ mmanager.createTimeseries(
+ new PartialPath("root.sg1.d2.s0"),
+ TSDataType.DOUBLE,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ mmanager.createTimeseries(
+ new PartialPath("root.sg2.d2.s0"),
+ TSDataType.DOUBLE,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+
+ long timestamp = System.currentTimeMillis() - 1500;
+ TimeValuePair tv =
+ new TimeValuePair(timestamp, TsPrimitiveType.getByType(TSDataType.DOUBLE, 1.0));
+ IMeasurementMNode node1 = mmanager.getMeasurementMNode(new PartialPath("root.sg1.d2.s0"));
+ LastCacheManager.updateLastCache(node1, tv, true, Long.MIN_VALUE);
+ IMeasurementMNode node2 = mmanager.getMeasurementMNode(new PartialPath("root.sg2.d2.s0"));
+ LastCacheManager.updateLastCache(node2, tv, true, Long.MIN_VALUE);
+ mmanager.setTTL(new PartialPath("root.sg1"), 2000);
+ mmanager.setTTL(new PartialPath("root.sg2"), 500);
+
+ mmanager.checkTTLOnLastCache();
+ Assert.assertNotNull(mmanager.getLastCache(node1));
+ Assert.assertNull(mmanager.getLastCache(node2));
+
+ mmanager.setTTL(new PartialPath("root.sg1"), 1000);
+ mmanager.checkTTLOnLastCache();
+ Assert.assertNull(mmanager.getLastCache(node1));
+ }
}