You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/15 02:43:59 UTC
[iotdb] 01/01: [IOTDB-3831] Fix TTL doesn't take effect in last query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-3831
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 70f704ea57629b3b7c4bd760b7d7204f7a1586e6
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Nov 15 10:43:44 2022 +0800
[IOTDB-3831] Fix TTL doesn't take effect in last query
---
.../iotdb/db/it/last/IoTDBLastWithTTLIT.java | 106 +++++++++++++++++++++
.../db/engine/querycontext/QueryDataSource.java | 5 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 4 +
.../plan/planner/LocalExecutionPlanContext.java | 13 ++-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 3 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 7 +-
6 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java
new file mode 100644
index 0000000000..3d9b6a75e0
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.last;
+
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLastWithTTLIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.sg.d1(time, s1, s2) values(1, 1, 1)");
+ statement.execute("insert into root.sg.d2(time, s1, s2) aligned values(2, 1, 1)");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ @Test
+ public void withTTL() {
+ String[] retArray =
+ new String[] {
+ "1,root.sg.d1.s1,1.0,FLOAT",
+ "1,root.sg.d1.s2,1.0,FLOAT",
+ "2,root.sg.d2.s1,1.0,FLOAT",
+ "2,root.sg.d2.s2,1.0,FLOAT"
+ };
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet =
+ statement.executeQuery("select last * from root.sg.* order by timeseries asc")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(ColumnHeaderConstant.TIME)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.TIMESERIES)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.VALUE)
+ + ","
+ + resultSet.getString(ColumnHeaderConstant.DATATYPE);
+ assertEquals(retArray[cnt++], ans);
+ }
+ assertEquals(retArray.length, cnt);
+ }
+
+ statement.execute("set ttl to root.sg 1");
+
+ try (ResultSet resultSet =
+ statement.executeQuery("select last * from root.sg.* order by timeseries asc")) {
+ assertFalse(resultSet.next());
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 995bf93961..63db473dc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -76,6 +76,11 @@ public class QueryDataSource {
/** @return an updated filter concerning TTL */
public Filter updateFilterUsingTTL(Filter filter) {
+ return updateFilterUsingTTL(filter, dataTTL);
+ }
+
+ /** @return an updated filter concerning TTL */
+ public static Filter updateFilterUsingTTL(Filter filter, long dataTTL) {
if (dataTTL != Long.MAX_VALUE) {
if (filter != null) {
filter = new AndFilter(filter, TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 3bcf0a916e..d252a70280 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3369,6 +3369,10 @@ public class DataRegion {
return dataRegionInfo.getMemCost();
}
+ public long getDataTTL() {
+ return dataTTL;
+ }
+
@TestOnly
public ILastFlushTimeMap getLastFlushTimeMap() {
return lastFlushTimeMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 6b644fb431..4a3d844b73 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -46,6 +46,8 @@ public class LocalExecutionPlanContext {
private final Map<String, Set<String>> allSensorsMap;
// Used to lock corresponding query resources
private final List<DataSourceOperator> sourceOperators;
+
+ private final long dataRegionTTL;
private ISinkHandle sinkHandle;
private int nextOperatorId = 0;
@@ -62,16 +64,19 @@ public class LocalExecutionPlanContext {
private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+ // for data region
public LocalExecutionPlanContext(
- TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
+ TypeProvider typeProvider, FragmentInstanceContext instanceContext, long dataRegionTTL) {
this.typeProvider = typeProvider;
this.instanceContext = instanceContext;
this.paths = new ArrayList<>();
this.allSensorsMap = new HashMap<>();
this.sourceOperators = new ArrayList<>();
this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
+ this.dataRegionTTL = dataRegionTTL;
}
+ // for schema region
public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) {
this.instanceContext = instanceContext;
this.paths = new ArrayList<>();
@@ -81,6 +86,8 @@ public class LocalExecutionPlanContext {
// only used in `order by heat`
this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
+ // there is no ttl in schema region, so we don't care this field
+ this.dataRegionTTL = Long.MAX_VALUE;
}
public int getNextOperatorId() {
@@ -158,4 +165,8 @@ public class LocalExecutionPlanContext {
public boolean isNeedUpdateLastCache() {
return needUpdateLastCache;
}
+
+ public long getDataRegionTTL() {
+ return dataRegionTTL;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index f292777567..2121d1547e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -63,7 +63,8 @@ public class LocalExecutionPlanner {
Filter timeFilter,
DataRegion dataRegion)
throws MemoryNotEnoughException {
- LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext);
+ LocalExecutionPlanContext context =
+ new LocalExecutionPlanContext(types, instanceContext, dataRegion.getDataTTL());
Operator root = plan.accept(new OperatorTreeGenerator(), context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a02cbd773a..c7964ae828 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -199,6 +199,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.engine.querycontext.QueryDataSource.updateFilterUsingTTL;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
@@ -1618,7 +1619,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
if (timeValuePair == null) { // last value is not cached
return createUpdateLastCacheOperator(node, context, node.getSeriesPath());
} else if (!LastQueryUtil.satisfyFilter(
- context.getLastQueryTimeFilter(), timeValuePair)) { // cached last value is not satisfied
+ updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
+ timeValuePair)) { // cached last value is not satisfied
boolean isFilterGtOrGe =
(context.getLastQueryTimeFilter() instanceof Gt
@@ -1701,7 +1703,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return createUpdateLastCacheOperator(
node, context, node.getSeriesPath().getMeasurementPath());
} else if (!LastQueryUtil.satisfyFilter(
- context.getLastQueryTimeFilter(), timeValuePair)) { // cached last value is not satisfied
+ updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
+ timeValuePair)) { // cached last value is not satisfied
boolean isFilterGtOrGe =
(context.getLastQueryTimeFilter() instanceof Gt