You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/15 07:56:58 UTC

[iotdb] branch master updated: [IOTDB-3831] Fix TTL doesn't take effect in last query (#7988)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d9a021da6 [IOTDB-3831] Fix TTL doesn't take effect in last query (#7988)
8d9a021da6 is described below

commit 8d9a021da64d65f5836f4d50ad8edf3689d1bbfb
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Nov 15 15:56:52 2022 +0800

    [IOTDB-3831] Fix TTL doesn't take effect in last query (#7988)
---
 .../iotdb/db/it/last/IoTDBLastWithTTLIT.java       | 106 +++++++++++++++++++++
 .../db/engine/querycontext/QueryDataSource.java    |   5 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   4 +
 .../plan/planner/LocalExecutionPlanContext.java    |  13 ++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   3 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   7 +-
 6 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java
new file mode 100644
index 0000000000..3d9b6a75e0
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.last;
+
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLastWithTTLIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.sg.d1(time, s1, s2) values(1, 1, 1)");
+      statement.execute("insert into root.sg.d2(time, s1, s2) aligned values(2, 1, 1)");
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void withTTL() {
+    String[] retArray =
+        new String[] {
+          "1,root.sg.d1.s1,1.0,FLOAT",
+          "1,root.sg.d1.s2,1.0,FLOAT",
+          "2,root.sg.d2.s1,1.0,FLOAT",
+          "2,root.sg.d2.s2,1.0,FLOAT"
+        };
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select last * from root.sg.* order by timeseries asc")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(ColumnHeaderConstant.TIME)
+                  + ","
+                  + resultSet.getString(ColumnHeaderConstant.TIMESERIES)
+                  + ","
+                  + resultSet.getString(ColumnHeaderConstant.VALUE)
+                  + ","
+                  + resultSet.getString(ColumnHeaderConstant.DATATYPE);
+          assertEquals(retArray[cnt++], ans);
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+      statement.execute("set ttl to root.sg 1");
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select last * from root.sg.* order by timeseries asc")) {
+        assertFalse(resultSet.next());
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 995bf93961..63db473dc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -76,6 +76,11 @@ public class QueryDataSource {
 
   /** @return an updated filter concerning TTL */
   public Filter updateFilterUsingTTL(Filter filter) {
+    return updateFilterUsingTTL(filter, dataTTL);
+  }
+
+  /** @return an updated filter concerning TTL */
+  public static Filter updateFilterUsingTTL(Filter filter, long dataTTL) {
     if (dataTTL != Long.MAX_VALUE) {
       if (filter != null) {
         filter = new AndFilter(filter, TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 0ba2047e78..984f793922 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3370,6 +3370,10 @@ public class DataRegion {
     return dataRegionInfo.getMemCost();
   }
 
+  public long getDataTTL() {
+    return dataTTL;
+  }
+
   @TestOnly
   public ILastFlushTimeMap getLastFlushTimeMap() {
     return lastFlushTimeMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 6b644fb431..4a3d844b73 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -46,6 +46,8 @@ public class LocalExecutionPlanContext {
   private final Map<String, Set<String>> allSensorsMap;
   // Used to lock corresponding query resources
   private final List<DataSourceOperator> sourceOperators;
+
+  private final long dataRegionTTL;
   private ISinkHandle sinkHandle;
 
   private int nextOperatorId = 0;
@@ -62,16 +64,19 @@ public class LocalExecutionPlanContext {
 
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
 
+  // for data region
   public LocalExecutionPlanContext(
-      TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
+      TypeProvider typeProvider, FragmentInstanceContext instanceContext, long dataRegionTTL) {
     this.typeProvider = typeProvider;
     this.instanceContext = instanceContext;
     this.paths = new ArrayList<>();
     this.allSensorsMap = new HashMap<>();
     this.sourceOperators = new ArrayList<>();
     this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
+    this.dataRegionTTL = dataRegionTTL;
   }
 
+  // for schema region
   public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) {
     this.instanceContext = instanceContext;
     this.paths = new ArrayList<>();
@@ -81,6 +86,8 @@ public class LocalExecutionPlanContext {
 
     // only used in `order by heat`
     this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
+    // there is no ttl in schema region, so we don't care this field
+    this.dataRegionTTL = Long.MAX_VALUE;
   }
 
   public int getNextOperatorId() {
@@ -158,4 +165,8 @@ public class LocalExecutionPlanContext {
   public boolean isNeedUpdateLastCache() {
     return needUpdateLastCache;
   }
+
+  public long getDataRegionTTL() {
+    return dataRegionTTL;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index f292777567..2121d1547e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -63,7 +63,8 @@ public class LocalExecutionPlanner {
       Filter timeFilter,
       DataRegion dataRegion)
       throws MemoryNotEnoughException {
-    LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext);
+    LocalExecutionPlanContext context =
+        new LocalExecutionPlanContext(types, instanceContext, dataRegion.getDataTTL());
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a02cbd773a..c7964ae828 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -199,6 +199,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.engine.querycontext.QueryDataSource.updateFilterUsingTTL;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
@@ -1618,7 +1619,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     if (timeValuePair == null) { // last value is not cached
       return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
     } else if (!LastQueryUtil.satisfyFilter(
-        context.getLastQueryTimeFilter(), timeValuePair)) { // cached last value is not satisfied
+        updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
+        timeValuePair)) { // cached last value is not satisfied
 
       boolean isFilterGtOrGe =
           (context.getLastQueryTimeFilter() instanceof Gt
@@ -1701,7 +1703,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       return createUpdateLastCacheOperator(
           node, context, node.getSeriesPath().getMeasurementPath());
     } else if (!LastQueryUtil.satisfyFilter(
-        context.getLastQueryTimeFilter(), timeValuePair)) { // cached last value is not satisfied
+        updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
+        timeValuePair)) { // cached last value is not satisfied
 
       boolean isFilterGtOrGe =
           (context.getLastQueryTimeFilter() instanceof Gt