You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/07/27 01:03:26 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 880438fc57 [To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
880438fc57 is described below

commit 880438fc579ad03453c009d5d0036fea2c5eb810
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Jul 27 09:03:20 2022 +0800

    [To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
    
    [To rel/0.13][IOTDB-3831]Fix ttl on last cache (#6779)
---
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    | 20 ++++++++++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  1 +
 .../org/apache/iotdb/db/metadata/MManager.java     | 10 ++++++
 .../db/metadata/lastCache/LastCacheManager.java    | 37 +++++++++++++++++++++-
 .../org/apache/iotdb/db/metadata/mtree/MTree.java  | 32 +++++++++++++++++++
 .../iotdb/db/query/executor/LastQueryExecutor.java |  1 +
 .../iotdb/db/metadata/MManagerAdvancedTest.java    | 35 ++++++++++++++++++++
 7 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
index 1f279296a9..ede0442975 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -220,4 +221,23 @@ public class IoTDBTtlIT {
               || result.equals("root.group2.sgroup1 10000\n" + "root.group1,null\n"));
     }
   }
+
+  @Test
+  public void testTTLOnLastCache() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.TTL_SG1");
+      statement.execute("CREATE TIMESERIES root.TTL_SG1.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+
+      long now = System.currentTimeMillis();
+      statement.execute(
+          String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", now - 110, 1));
+
+      statement.execute("SET TTL TO root.TTL_SG1 100");
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT last s1 FROM root.TTL_SG1")) {
+        Assert.assertFalse(resultSet.next());
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 7bc4b22e18..034773b55a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -395,6 +395,7 @@ public class StorageEngine implements IService {
 
   private void checkTTL() {
     try {
+      IoTDB.metaManager.checkTTLOnLastCache();
       for (StorageGroupManager processor : processorMap.values()) {
         processor.checkTTL();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 12d89f8bba..2bb80ce64c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1947,6 +1947,16 @@ public class MManager {
           node.getAsEntityMNode(), originalPath, startTime, endTime);
     }
   }
+
+  public void checkTTLOnLastCache() {
+    try {
+      mtree.processMNodeDuringTraversal(
+          new PartialPath(new String[] {"root", "**"}), LastCacheManager::checkTTLOnLastCache);
+    } catch (MetadataException e) {
+      logger.error(e.getMessage(), e);
+    }
+  }
+
   // endregion
 
   // region Interfaces and Implementation for InsertPlan process
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
index 117f5cec07..820b97c74a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
@@ -61,7 +61,19 @@ public class LastCacheManager {
     checkIsTemplateLastCacheAndSetIfAbsent(node);
 
     ILastCacheContainer lastCacheContainer = node.getLastCacheContainer();
-    return lastCacheContainer.getCachedLast();
+    TimeValuePair result = lastCacheContainer.getCachedLast();
+    long ttl = getTTL(node);
+    return result == null || ttl == -1 || result.getTimestamp() < System.currentTimeMillis() - ttl
+        ? null
+        : result;
+  }
+
+  private static long getTTL(IMeasurementMNode node) {
+    IMNode ancestor = node;
+    while (ancestor != null && !ancestor.isStorageGroup()) {
+      ancestor = ancestor.getParent();
+    }
+    return ancestor == null ? -1 : ancestor.getAsStorageGroupMNode().getDataTTL();
   }
 
   /**
@@ -250,4 +262,27 @@ public class LastCacheManager {
       }
     }
   }
+
+  public static void checkTTLOnLastCache(IMNode node, long dataTTL) {
+    if (node.isMeasurement()) {
+      processTTLOnLastCacheContainer(node.getAsMeasurementMNode().getLastCacheContainer(), dataTTL);
+      return;
+    }
+
+    if (node.isEntity()) {
+      Map<String, ILastCacheContainer> containerMap =
+          node.getAsEntityMNode().getTemplateLastCaches();
+      for (ILastCacheContainer container : containerMap.values()) {
+        processTTLOnLastCacheContainer(container, dataTTL);
+      }
+    }
+  }
+
+  private static void processTTLOnLastCacheContainer(ILastCacheContainer container, long dataTTL) {
+    TimeValuePair timeValuePair = container.getCachedLast();
+    if (timeValuePair != null
+        && timeValuePair.getTimestamp() < System.currentTimeMillis() - dataTTL) {
+      container.resetLastCache();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
index 4e74440488..eb38dd6cc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.metadata.mnode.InternalMNode;
 import org.apache.iotdb.db.metadata.mnode.MNodeUtils;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.traverser.Traverser;
 import org.apache.iotdb.db.metadata.mtree.traverser.collector.CollectorTraverser;
 import org.apache.iotdb.db.metadata.mtree.traverser.collector.EntityCollector;
 import org.apache.iotdb.db.metadata.mtree.traverser.collector.MNodeCollector;
@@ -104,6 +105,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
@@ -1969,4 +1971,34 @@ public class MTree implements Serializable {
     return res;
   }
   // endregion
+
+  public void processMNodeDuringTraversal(
+      PartialPath pathPattern, BiConsumer<IMNode, Long> consumer) throws MetadataException {
+    Traverser traverser =
+        new Traverser(root, pathPattern) {
+
+          private long dataTTL = -1;
+
+          @Override
+          protected boolean processInternalMatchedMNode(IMNode node, int idx, int level)
+              throws MetadataException {
+            return processMNode(node);
+          }
+
+          @Override
+          protected boolean processFullMatchedMNode(IMNode node, int idx, int level)
+              throws MetadataException {
+            return processMNode(node);
+          }
+
+          private boolean processMNode(IMNode node) {
+            if (node.isStorageGroup()) {
+              dataTTL = node.getAsStorageGroupMNode().getDataTTL();
+            }
+            consumer.accept(node, dataTTL);
+            return node.isMeasurement();
+          }
+        };
+    traverser.traverse();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index ef28a6dfaf..ed9b7f938e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -199,6 +199,7 @@ public class LastQueryExecutor {
       QueryDataSource dataSource =
           QueryResourceManager.getInstance()
               .getQueryDataSource(nonCachedPaths.get(i), context, filter, ascending);
+      filter = dataSource.updateFilterUsingTTL(filter);
       LastPointReader lastReader =
           nonCachedPaths
               .get(i)
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
index 344bb4804a..250c428d4d 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
@@ -37,6 +37,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
@@ -225,4 +226,38 @@ public class MManagerAdvancedTest {
     LastCacheManager.updateLastCache(node, tv3, true, Long.MIN_VALUE);
     Assert.assertEquals(tv2.getTimestamp(), mmanager.getLastCache(node).getTimestamp());
   }
+
+  @Test
+  public void testLastCacheWithTTL() throws MetadataException, IOException {
+    mmanager.createTimeseries(
+        new PartialPath("root.sg1.d2.s0"),
+        TSDataType.DOUBLE,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    mmanager.createTimeseries(
+        new PartialPath("root.sg2.d2.s0"),
+        TSDataType.DOUBLE,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+
+    long timestamp = System.currentTimeMillis() - 1500;
+    TimeValuePair tv =
+        new TimeValuePair(timestamp, TsPrimitiveType.getByType(TSDataType.DOUBLE, 1.0));
+    IMeasurementMNode node1 = mmanager.getMeasurementMNode(new PartialPath("root.sg1.d2.s0"));
+    LastCacheManager.updateLastCache(node1, tv, true, Long.MIN_VALUE);
+    IMeasurementMNode node2 = mmanager.getMeasurementMNode(new PartialPath("root.sg2.d2.s0"));
+    LastCacheManager.updateLastCache(node2, tv, true, Long.MIN_VALUE);
+    mmanager.setTTL(new PartialPath("root.sg1"), 2000);
+    mmanager.setTTL(new PartialPath("root.sg2"), 500);
+
+    mmanager.checkTTLOnLastCache();
+    Assert.assertNotNull(mmanager.getLastCache(node1));
+    Assert.assertNull(mmanager.getLastCache(node2));
+
+    mmanager.setTTL(new PartialPath("root.sg1"), 1000);
+    mmanager.checkTTLOnLastCache();
+    Assert.assertNull(mmanager.getLastCache(node1));
+  }
 }