You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/12 03:34:53 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] [ISSUE-5773] Fix missing data in group by query (#5796)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 8f46e1b939 [To rel/0.13] [ISSUE-5773] Fix missing data in group by query (#5796)
8f46e1b939 is described below
commit 8f46e1b93935ef6b5dcf86fd8ecf19a11f10b5bf
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu May 12 11:34:49 2022 +0800
[To rel/0.13] [ISSUE-5773] Fix missing data in group by query (#5796)
---
.../IoTDBGroupByQueryWithoutValueFilterIT.java | 77 ++++++++++++++++++++++
.../db/query/aggregation/AggregateResult.java | 7 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 11 ++--
.../db/query/aggregation/impl/CountAggrResult.java | 9 ++-
.../query/aggregation/impl/ExtremeAggrResult.java | 12 ++--
.../aggregation/impl/FirstValueAggrResult.java | 10 +--
.../aggregation/impl/FirstValueDescAggrResult.java | 8 +--
.../aggregation/impl/LastValueAggrResult.java | 10 +--
.../aggregation/impl/LastValueDescAggrResult.java | 8 +--
.../query/aggregation/impl/MaxTimeAggrResult.java | 10 +--
.../aggregation/impl/MaxTimeDescAggrResult.java | 8 +--
.../query/aggregation/impl/MaxValueAggrResult.java | 10 +--
.../query/aggregation/impl/MinTimeAggrResult.java | 10 +--
.../aggregation/impl/MinTimeDescAggrResult.java | 6 +-
.../query/aggregation/impl/MinValueAggrResult.java | 10 +--
.../db/query/aggregation/impl/SumAggrResult.java | 11 ++--
.../groupby/LocalAlignedGroupByExecutor.java | 43 +++++++-----
.../dataset/groupby/LocalGroupByExecutor.java | 9 +--
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 9 +++
.../query/aggregation/DescAggregateResultTest.java | 9 +--
.../apache/iotdb/tsfile/read/common/BatchData.java | 17 +++++
.../tsfile/read/common/IBatchDataIterator.java | 12 ++++
22 files changed, 221 insertions(+), 95 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java
index 81862999ab..a93a4771aa 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java
@@ -1088,6 +1088,83 @@ public class IoTDBGroupByQueryWithoutValueFilterIT {
}
}
+ @Test
+ public void groupByBigDataTest() throws SQLException {
+ String[] retArray =
+ new String[] {
+ "0,256,256,0.0,0.0,255.0,255.0",
+ "256,256,256,256.0,256.0,511.0,511.0",
+ "512,256,256,512.0,512.0,767.0,767.0",
+ "768,256,256,768.0,768.0,1023.0,1023.0",
+ "1024,256,256,1024.0,1024.0,1279.0,1279.0"
+ };
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = 0; i < 1281; i++) {
+ statement.execute(
+ String.format(
+ "insert into root.sg2.d1(time, s1, s2) aligned values(%s, %s, %s)", i, i, i));
+ }
+ statement.execute("flush");
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(*), first_value(*), last_value(*) from root.sg2.d1 "
+ + "GROUP BY ([0, 1280), 256ms)");
+ Assert.assertTrue(hasResultSet);
+
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg2.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg2.d1.s2"))
+ + ","
+ + resultSet.getString(firstValue("root.sg2.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg2.d1.s2"))
+ + ","
+ + resultSet.getString(lastValue("root.sg2.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg2.d1.s2"));
+ Assert.assertEquals(retArray[cnt++], ans);
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select count(*), first_value(*), last_value(*) from root.sg2.d1 "
+ + "GROUP BY ([0, 1280), 256ms) order by time desc");
+ Assert.assertTrue(hasResultSet);
+
+ cnt = retArray.length - 1;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg2.d1.s1"))
+ + ","
+ + resultSet.getString(count("root.sg2.d1.s2"))
+ + ","
+ + resultSet.getString(firstValue("root.sg2.d1.s1"))
+ + ","
+ + resultSet.getString(firstValue("root.sg2.d1.s2"))
+ + ","
+ + resultSet.getString(lastValue("root.sg2.d1.s1"))
+ + ","
+ + resultSet.getString(lastValue("root.sg2.d1.s2"));
+ Assert.assertEquals(retArray[cnt--], ans);
+ }
+ Assert.assertEquals(0, cnt + 1);
+ }
+ }
+ }
+
@Test
public void groupByWithoutAggregationFuncTest() {
try (Connection connection = EnvFactory.getEnv().getConnection();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7daa1e8d60..8719a91dee 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public abstract class AggregateResult {
@@ -84,11 +85,11 @@ public abstract class AggregateResult {
* This method is used in GROUP BY aggregation query.
*
* @param batchIterator the data in Page
- * @param minBound calculate points whose time >= bound
- * @param maxBound calculate points whose time < bound
+ * @param boundPredicate used to judge whether the current timestamp is out of time range, returns
+ * true if it is.
*/
public abstract void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException;
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) throws IOException;
/**
* This method calculates the aggregation using common timestamps of the cross series filter.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 49fc5efa46..7046559cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class AvgAggrResult extends AggregateResult {
@@ -84,16 +85,14 @@ public class AvgAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
- while (batchIterator.hasNext(minBound, maxBound)) {
- if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
- break;
- }
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
updateAvg(seriesDataType, batchIterator.currentValue());
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index ec279729f9..d1745e2d79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class CountAggrResult extends AggregateResult {
@@ -56,12 +57,10 @@ public class CountAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
int cnt = 0;
- while (batchIterator.hasNext(minBound, maxBound)) {
- if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
- break;
- }
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
cnt++;
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 8b40461331..3fa84bad55 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class ExtremeAggrResult extends AggregateResult {
@@ -95,18 +96,15 @@ public class ExtremeAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
Comparable<Object> extVal = null;
-
- while (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
-
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
extVal = getExtremeValue(extVal, (Comparable<Object>) batchIterator.currentValue());
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index c774636e57..97ad1e04b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class FirstValueAggrResult extends AggregateResult {
@@ -66,18 +67,17 @@ public class FirstValueAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
if (hasFinalResult()) {
return;
}
- if (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ if (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
setValue(batchIterator.currentValue());
timestamp = batchIterator.currentTime();
batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 2c2e60d94a..3be3f9eaa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
+import java.util.function.Predicate;
public class FirstValueDescAggrResult extends FirstValueAggrResult {
@@ -42,10 +43,9 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
- while (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
setValue(batchIterator.currentValue());
timestamp = batchIterator.currentTime();
batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 7fab03c3aa..7309cd5f05 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class LastValueAggrResult extends AggregateResult {
@@ -62,17 +63,16 @@ public class LastValueAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
long time = Long.MIN_VALUE;
Object lastVal = null;
- while (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
time = batchIterator.currentTime();
lastVal = batchIterator.currentValue();
batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 6fb635543b..4bd7153e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
+import java.util.function.Predicate;
public class LastValueDescAggrResult extends LastValueAggrResult {
@@ -44,15 +45,14 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
if (hasFinalResult()) {
return;
}
long time = Long.MIN_VALUE;
Object lastVal = null;
- if (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ if (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
time = batchIterator.currentTime();
lastVal = batchIterator.currentValue();
batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index c98ed969a1..58b544164a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class MaxTimeAggrResult extends AggregateResult {
@@ -51,15 +52,14 @@ public class MaxTimeAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
- while (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
updateMaxTimeResult(batchIterator.currentTime());
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index c4992542e6..b0d8e66ed6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
+import java.util.function.Predicate;
public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
@@ -37,13 +38,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
if (hasFinalResult()) {
return;
}
- if (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ if (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
updateMaxTimeResult(batchIterator.currentTime());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 8ee636daea..7f9c5b8e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class MaxValueAggrResult extends AggregateResult {
@@ -51,17 +52,16 @@ public class MaxValueAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
Comparable<Object> maxVal = null;
- while (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
if (maxVal == null || maxVal.compareTo(batchIterator.currentValue()) < 0) {
maxVal = (Comparable<Object>) batchIterator.currentValue();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 9521e363cd..630ef4bfbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class MinTimeAggrResult extends AggregateResult {
@@ -54,18 +55,17 @@ public class MinTimeAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
if (hasFinalResult()) {
return;
}
- if (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ if (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
setLongValue(batchIterator.currentTime());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 4a9ae2c909..84bb1a231e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
+import java.util.function.Predicate;
public class MinTimeDescAggrResult extends MinTimeAggrResult {
@@ -35,8 +36,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
- while (batchIterator.hasNext(minBound, maxBound) && batchIterator.currentTime() >= minBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
setValue(batchIterator.currentTime());
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index aa06d78075..8f2bd7ff6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class MinValueAggrResult extends AggregateResult {
@@ -51,15 +52,14 @@ public class MinValueAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
- while (batchIterator.hasNext(minBound, maxBound)
- && batchIterator.currentTime() < maxBound
- && batchIterator.currentTime() >= minBound) {
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
updateResult((Comparable<Object>) batchIterator.currentValue());
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index c35fbfc050..0f10d0e84f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Predicate;
public class SumAggrResult extends AggregateResult {
@@ -65,16 +66,14 @@ public class SumAggrResult extends AggregateResult {
@Override
public void updateResultFromPageData(IBatchDataIterator batchIterator) {
- updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+ updateResultFromPageData(batchIterator, time -> false);
}
@Override
public void updateResultFromPageData(
- IBatchDataIterator batchIterator, long minBound, long maxBound) {
- while (batchIterator.hasNext(minBound, maxBound)) {
- if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
- break;
- }
+ IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+ while (batchIterator.hasNext(boundPredicate)
+ && !boundPredicate.test(batchIterator.currentTime())) {
updateSum(batchIterator.currentValue());
batchIterator.next();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
index bebb354f14..87faffebbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -41,6 +42,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
@@ -58,8 +60,6 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
private final boolean ascending;
- private final QueryDataSource queryDataSource;
-
public LocalAlignedGroupByExecutor(
PartialPath path,
QueryContext context,
@@ -67,7 +67,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
- queryDataSource =
+ QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter, ascending);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
@@ -245,9 +245,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
// set initial Index
lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
- ;
lastReadCurListIndex = batchData.getReadCurListIndex();
- ;
// stop calc and cached current batchData
if (ascending && batchData.currentTime() >= curEndTime) {
@@ -289,37 +287,50 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
boolean hasCached = false;
int curReadCurArrayIndex = lastReadCurArrayIndex;
+ int curReadCurListIndex = lastReadCurListIndex;
+ Predicate<Long> iteratorPredicate =
+ QueryUtils.getPredicate(curStartTime, curEndTime, ascending);
while (reader.hasNextSubSeries()) {
int subIndex = reader.getCurIndex();
- batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
List<AggregateResult> aggregateResultList = results.get(subIndex);
for (AggregateResult result : aggregateResultList) {
// current agg method has been calculated
if (result.hasFinalResult()) {
continue;
}
- // lazy reset batch data for calculation
+ // reset batch data for calculation
batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
IBatchDataIterator batchDataIterator = batchData.getBatchDataIterator(subIndex);
if (ascending) {
// skip points that cannot be calculated
- while (batchDataIterator.hasNext(curStartTime, curEndTime)
+ while (batchDataIterator.hasNext(iteratorPredicate)
&& batchDataIterator.currentTime() < curStartTime) {
batchDataIterator.next();
}
} else {
- while (batchDataIterator.hasNext(curStartTime, curEndTime)
+ while (batchDataIterator.hasNext(iteratorPredicate)
&& batchDataIterator.currentTime() >= curEndTime) {
batchDataIterator.next();
}
}
- if (batchDataIterator.hasNext(curStartTime, curEndTime)) {
- result.updateResultFromPageData(batchDataIterator, curStartTime, curEndTime);
+ if (batchDataIterator.hasNext(iteratorPredicate)) {
+ result.updateResultFromPageData(batchDataIterator, iteratorPredicate);
+ }
+ if (ascending) {
+ if (batchData.getReadCurListIndex() > curReadCurListIndex) {
+ curReadCurListIndex = batchData.getReadCurListIndex();
+ curReadCurArrayIndex = batchData.getReadCurArrayIndex();
+ } else if (batchData.getReadCurListIndex() == curReadCurListIndex) {
+ curReadCurArrayIndex = Math.max(batchData.getReadCurArrayIndex(), curReadCurArrayIndex);
+ }
+ } else {
+ if (batchData.getReadCurListIndex() < curReadCurListIndex) {
+ curReadCurListIndex = batchData.getReadCurListIndex();
+ curReadCurArrayIndex = batchData.getReadCurArrayIndex();
+ } else if (batchData.getReadCurListIndex() == curReadCurListIndex) {
+ curReadCurArrayIndex = Math.min(batchData.getReadCurArrayIndex(), curReadCurArrayIndex);
+ }
}
- curReadCurArrayIndex =
- ascending
- ? Math.max(curReadCurArrayIndex, batchData.getReadCurArrayIndex())
- : Math.min(curReadCurArrayIndex, batchData.getReadCurArrayIndex());
}
// can calc for next interval
if (!hasCached && batchData.hasCurrent()) {
@@ -331,7 +342,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
// reset the last position to current Index
lastReadCurArrayIndex = curReadCurArrayIndex;
- lastReadCurListIndex = batchData.getReadCurListIndex();
+ lastReadCurListIndex = curReadCurListIndex;
batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index 22563f8823..fd692c410a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -40,6 +41,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
public class LocalGroupByExecutor implements GroupByExecutor {
@@ -55,8 +57,6 @@ public class LocalGroupByExecutor implements GroupByExecutor {
private int lastReadCurListIndex;
private final boolean ascending;
- private final QueryDataSource queryDataSource;
-
public LocalGroupByExecutor(
PartialPath path,
Set<String> allSensors,
@@ -65,7 +65,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
- queryDataSource =
+ QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter, ascending);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
@@ -125,6 +125,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
return;
}
+ Predicate<Long> boundPredicate = QueryUtils.getPredicate(curStartTime, curEndTime, ascending);
for (AggregateResult result : results) {
// current agg method has been calculated
if (result.hasFinalResult()) {
@@ -145,7 +146,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
}
if (batchIterator.hasNext()) {
- result.updateResultFromPageData(batchIterator, curStartTime, curEndTime);
+ result.updateResultFromPageData(batchIterator, boundPredicate);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 6604de1422..15dad4affb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
public class QueryUtils {
@@ -220,4 +221,12 @@ public class QueryUtils {
.forEach(item -> orderIndex[index.getAndIncrement()] = item.getKey());
dataSource.setUnSeqFileOrderIndex(orderIndex);
}
+
+ /**
+ * @return A predicate used to judge whether the current timestamp is out of time range, returns
+ * true if it is.
+ */
+ public static Predicate<Long> getPredicate(long minBound, long maxBound, boolean ascending) {
+ return ascending ? time -> time >= maxBound : time -> time < minBound;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
index d038b3467f..cb532b95bd 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -72,7 +73,7 @@ public class DescAggregateResultTest {
maxTimeDescAggrResult.updateResultFromPageData(it);
Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
it.reset();
- maxTimeDescAggrResult.updateResultFromPageData(it, 2, 5);
+ maxTimeDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(2, 5, false));
Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
}
@@ -109,7 +110,7 @@ public class DescAggregateResultTest {
minTimeDescAggrResult.updateResultFromPageData(it);
Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
it.reset();
- minTimeDescAggrResult.updateResultFromPageData(it, 1, 3);
+ minTimeDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(1, 3, false));
Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
}
@@ -147,7 +148,7 @@ public class DescAggregateResultTest {
firstValueDescAggrResult.updateResultFromPageData(it);
Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
it.reset();
- firstValueDescAggrResult.updateResultFromPageData(it, 1, 3);
+ firstValueDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(1, 3, false));
Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
}
@@ -184,7 +185,7 @@ public class DescAggregateResultTest {
lastValueDescAggrResult.updateResultFromPageData(it);
Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
it.reset();
- lastValueDescAggrResult.updateResultFromPageData(it, 3, 5);
+ lastValueDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(3, 5, false));
Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 3ff8c5f7ac..7b22ebf7d1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -37,6 +37,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Predicate;
/**
* <code>BatchData</code> is a self-defined data structure which is optimized for different type of
@@ -808,6 +809,11 @@ public class BatchData {
return hasNext();
}
+ @Override
+ public boolean hasNext(Predicate<Long> boundPredicate) {
+ return hasNext();
+ }
+
@Override
public void next() {
BatchData.this.next();
@@ -881,6 +887,17 @@ public class BatchData {
return BatchData.this.hasCurrent();
}
+ @Override
+ public boolean hasNext(Predicate<Long> boundPredicate) {
+ while (BatchData.this.hasCurrent() && currentValue() == null) {
+ if (boundPredicate.test(currentTime())) {
+ break;
+ }
+ super.next();
+ }
+ return BatchData.this.hasCurrent();
+ }
+
@Override
public Object currentValue() {
TsPrimitiveType v = getVector()[subIndex];
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
index 0761c1d00c..ad2ef98765 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
@@ -19,12 +19,24 @@
package org.apache.iotdb.tsfile.read.common;
+import java.util.function.Predicate;
+
public interface IBatchDataIterator {
boolean hasNext();
boolean hasNext(long minBound, long maxBound);
+ /**
+ * Determine whether there is a non-null value in the current time window. This method is used in
+ * GROUP BY aggregation query.
+ *
+ * @param boundPredicate A predicate used to judge whether the current timestamp is out of time
+ * range, returns true if it is. This predicate guarantees that the current time of batchData
+ * will not out of time range when a sensor's values are all null.
+ */
+ boolean hasNext(Predicate<Long> boundPredicate);
+
void next();
long currentTime();