You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/12 03:34:53 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] [ISSUE-5773] Fix missing data in group by query (#5796)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 8f46e1b939 [To rel/0.13] [ISSUE-5773] Fix missing data in group by query (#5796)
8f46e1b939 is described below

commit 8f46e1b93935ef6b5dcf86fd8ecf19a11f10b5bf
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu May 12 11:34:49 2022 +0800

    [To rel/0.13] [ISSUE-5773] Fix missing data in group by query (#5796)
---
 .../IoTDBGroupByQueryWithoutValueFilterIT.java     | 77 ++++++++++++++++++++++
 .../db/query/aggregation/AggregateResult.java      |  7 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   | 11 ++--
 .../db/query/aggregation/impl/CountAggrResult.java |  9 ++-
 .../query/aggregation/impl/ExtremeAggrResult.java  | 12 ++--
 .../aggregation/impl/FirstValueAggrResult.java     | 10 +--
 .../aggregation/impl/FirstValueDescAggrResult.java |  8 +--
 .../aggregation/impl/LastValueAggrResult.java      | 10 +--
 .../aggregation/impl/LastValueDescAggrResult.java  |  8 +--
 .../query/aggregation/impl/MaxTimeAggrResult.java  | 10 +--
 .../aggregation/impl/MaxTimeDescAggrResult.java    |  8 +--
 .../query/aggregation/impl/MaxValueAggrResult.java | 10 +--
 .../query/aggregation/impl/MinTimeAggrResult.java  | 10 +--
 .../aggregation/impl/MinTimeDescAggrResult.java    |  6 +-
 .../query/aggregation/impl/MinValueAggrResult.java | 10 +--
 .../db/query/aggregation/impl/SumAggrResult.java   | 11 ++--
 .../groupby/LocalAlignedGroupByExecutor.java       | 43 +++++++-----
 .../dataset/groupby/LocalGroupByExecutor.java      |  9 +--
 .../java/org/apache/iotdb/db/utils/QueryUtils.java |  9 +++
 .../query/aggregation/DescAggregateResultTest.java |  9 +--
 .../apache/iotdb/tsfile/read/common/BatchData.java | 17 +++++
 .../tsfile/read/common/IBatchDataIterator.java     | 12 ++++
 22 files changed, 221 insertions(+), 95 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java
index 81862999ab..a93a4771aa 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBGroupByQueryWithoutValueFilterIT.java
@@ -1088,6 +1088,83 @@ public class IoTDBGroupByQueryWithoutValueFilterIT {
     }
   }
 
+  @Test
+  public void groupByBigDataTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "0,256,256,0.0,0.0,255.0,255.0",
+          "256,256,256,256.0,256.0,511.0,511.0",
+          "512,256,256,512.0,512.0,767.0,767.0",
+          "768,256,256,768.0,768.0,1023.0,1023.0",
+          "1024,256,256,1024.0,1024.0,1279.0,1279.0"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 1281; i++) {
+        statement.execute(
+            String.format(
+                "insert into root.sg2.d1(time, s1, s2) aligned values(%s, %s,  %s)", i, i, i));
+      }
+      statement.execute("flush");
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(*), first_value(*), last_value(*) from root.sg2.d1 "
+                  + "GROUP BY ([0, 1280), 256ms)");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(count("root.sg2.d1.s1"))
+                  + ","
+                  + resultSet.getString(count("root.sg2.d1.s2"))
+                  + ","
+                  + resultSet.getString(firstValue("root.sg2.d1.s1"))
+                  + ","
+                  + resultSet.getString(firstValue("root.sg2.d1.s2"))
+                  + ","
+                  + resultSet.getString(lastValue("root.sg2.d1.s1"))
+                  + ","
+                  + resultSet.getString(lastValue("root.sg2.d1.s2"));
+          Assert.assertEquals(retArray[cnt++], ans);
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+
+      hasResultSet =
+          statement.execute(
+              "select count(*), first_value(*), last_value(*) from root.sg2.d1 "
+                  + "GROUP BY ([0, 1280), 256ms) order by time desc");
+      Assert.assertTrue(hasResultSet);
+
+      cnt = retArray.length - 1;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(count("root.sg2.d1.s1"))
+                  + ","
+                  + resultSet.getString(count("root.sg2.d1.s2"))
+                  + ","
+                  + resultSet.getString(firstValue("root.sg2.d1.s1"))
+                  + ","
+                  + resultSet.getString(firstValue("root.sg2.d1.s2"))
+                  + ","
+                  + resultSet.getString(lastValue("root.sg2.d1.s1"))
+                  + ","
+                  + resultSet.getString(lastValue("root.sg2.d1.s2"));
+          Assert.assertEquals(retArray[cnt--], ans);
+        }
+        Assert.assertEquals(0, cnt + 1);
+      }
+    }
+  }
+
   @Test
   public void groupByWithoutAggregationFuncTest() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7daa1e8d60..8719a91dee 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public abstract class AggregateResult {
 
@@ -84,11 +85,11 @@ public abstract class AggregateResult {
    * This method is used in GROUP BY aggregation query.
    *
    * @param batchIterator the data in Page
-   * @param minBound calculate points whose time >= bound
-   * @param maxBound calculate points whose time < bound
+   * @param boundPredicate used to judge whether the current timestamp is out of time range, returns
+   *     true if it is.
    */
   public abstract void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) throws IOException;
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) throws IOException;
 
   /**
    * This method calculates the aggregation using common timestamps of the cross series filter.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 49fc5efa46..7046559cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class AvgAggrResult extends AggregateResult {
 
@@ -84,16 +85,14 @@ public class AvgAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
-    while (batchIterator.hasNext(minBound, maxBound)) {
-      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
-        break;
-      }
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       updateAvg(seriesDataType, batchIterator.currentValue());
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index ec279729f9..d1745e2d79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class CountAggrResult extends AggregateResult {
 
@@ -56,12 +57,10 @@ public class CountAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     int cnt = 0;
-    while (batchIterator.hasNext(minBound, maxBound)) {
-      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
-        break;
-      }
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       cnt++;
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 8b40461331..3fa84bad55 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class ExtremeAggrResult extends AggregateResult {
 
@@ -95,18 +96,15 @@ public class ExtremeAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     Comparable<Object> extVal = null;
-
-    while (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
-
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       extVal = getExtremeValue(extVal, (Comparable<Object>) batchIterator.currentValue());
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index c774636e57..97ad1e04b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class FirstValueAggrResult extends AggregateResult {
 
@@ -66,18 +67,17 @@ public class FirstValueAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     if (hasFinalResult()) {
       return;
     }
-    if (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+    if (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       setValue(batchIterator.currentValue());
       timestamp = batchIterator.currentTime();
       batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 2c2e60d94a..3be3f9eaa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
+import java.util.function.Predicate;
 
 public class FirstValueDescAggrResult extends FirstValueAggrResult {
 
@@ -42,10 +43,9 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
-    while (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       setValue(batchIterator.currentValue());
       timestamp = batchIterator.currentTime();
       batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 7fab03c3aa..7309cd5f05 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class LastValueAggrResult extends AggregateResult {
 
@@ -62,17 +63,16 @@ public class LastValueAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     long time = Long.MIN_VALUE;
     Object lastVal = null;
-    while (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       time = batchIterator.currentTime();
       lastVal = batchIterator.currentValue();
       batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 6fb635543b..4bd7153e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
+import java.util.function.Predicate;
 
 public class LastValueDescAggrResult extends LastValueAggrResult {
 
@@ -44,15 +45,14 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     if (hasFinalResult()) {
       return;
     }
     long time = Long.MIN_VALUE;
     Object lastVal = null;
-    if (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+    if (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       time = batchIterator.currentTime();
       lastVal = batchIterator.currentValue();
       batchIterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index c98ed969a1..58b544164a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class MaxTimeAggrResult extends AggregateResult {
 
@@ -51,15 +52,14 @@ public class MaxTimeAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
-    while (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       updateMaxTimeResult(batchIterator.currentTime());
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index c4992542e6..b0d8e66ed6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
+import java.util.function.Predicate;
 
 public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
 
@@ -37,13 +38,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     if (hasFinalResult()) {
       return;
     }
-    if (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+    if (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       updateMaxTimeResult(batchIterator.currentTime());
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 8ee636daea..7f9c5b8e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class MaxValueAggrResult extends AggregateResult {
 
@@ -51,17 +52,16 @@ public class MaxValueAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     Comparable<Object> maxVal = null;
 
-    while (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       if (maxVal == null || maxVal.compareTo(batchIterator.currentValue()) < 0) {
         maxVal = (Comparable<Object>) batchIterator.currentValue();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 9521e363cd..630ef4bfbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class MinTimeAggrResult extends AggregateResult {
 
@@ -54,18 +55,17 @@ public class MinTimeAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
     if (hasFinalResult()) {
       return;
     }
-    if (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+    if (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       setLongValue(batchIterator.currentTime());
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 4a9ae2c909..84bb1a231e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
 import java.io.IOException;
+import java.util.function.Predicate;
 
 public class MinTimeDescAggrResult extends MinTimeAggrResult {
 
@@ -35,8 +36,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
-    while (batchIterator.hasNext(minBound, maxBound) && batchIterator.currentTime() >= minBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       setValue(batchIterator.currentTime());
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index aa06d78075..8f2bd7ff6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class MinValueAggrResult extends AggregateResult {
 
@@ -51,15 +52,14 @@ public class MinValueAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
-    while (batchIterator.hasNext(minBound, maxBound)
-        && batchIterator.currentTime() < maxBound
-        && batchIterator.currentTime() >= minBound) {
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       updateResult((Comparable<Object>) batchIterator.currentValue());
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index c35fbfc050..0f10d0e84f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Predicate;
 
 public class SumAggrResult extends AggregateResult {
 
@@ -65,16 +66,14 @@ public class SumAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromPageData(IBatchDataIterator batchIterator) {
-    updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE);
+    updateResultFromPageData(batchIterator, time -> false);
   }
 
   @Override
   public void updateResultFromPageData(
-      IBatchDataIterator batchIterator, long minBound, long maxBound) {
-    while (batchIterator.hasNext(minBound, maxBound)) {
-      if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
-        break;
-      }
+      IBatchDataIterator batchIterator, Predicate<Long> boundPredicate) {
+    while (batchIterator.hasNext(boundPredicate)
+        && !boundPredicate.test(batchIterator.currentTime())) {
       updateSum(batchIterator.currentValue());
       batchIterator.next();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
index bebb354f14..87faffebbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -41,6 +42,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
 
 public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
 
@@ -58,8 +60,6 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
 
   private final boolean ascending;
 
-  private final QueryDataSource queryDataSource;
-
   public LocalAlignedGroupByExecutor(
       PartialPath path,
       QueryContext context,
@@ -67,7 +67,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
       TsFileFilter fileFilter,
       boolean ascending)
       throws StorageEngineException, QueryProcessException {
-    queryDataSource =
+    QueryDataSource queryDataSource =
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter, ascending);
     // update filter by TTL
     timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
@@ -245,9 +245,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
 
       // set initial Index
       lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
-      ;
       lastReadCurListIndex = batchData.getReadCurListIndex();
-      ;
 
       // stop calc and cached current batchData
       if (ascending && batchData.currentTime() >= curEndTime) {
@@ -289,37 +287,50 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
 
     boolean hasCached = false;
     int curReadCurArrayIndex = lastReadCurArrayIndex;
+    int curReadCurListIndex = lastReadCurListIndex;
+    Predicate<Long> iteratorPredicate =
+        QueryUtils.getPredicate(curStartTime, curEndTime, ascending);
     while (reader.hasNextSubSeries()) {
       int subIndex = reader.getCurIndex();
-      batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
       List<AggregateResult> aggregateResultList = results.get(subIndex);
       for (AggregateResult result : aggregateResultList) {
         // current agg method has been calculated
         if (result.hasFinalResult()) {
           continue;
         }
-        // lazy reset batch data for calculation
+        // reset batch data for calculation
         batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
         IBatchDataIterator batchDataIterator = batchData.getBatchDataIterator(subIndex);
         if (ascending) {
           // skip points that cannot be calculated
-          while (batchDataIterator.hasNext(curStartTime, curEndTime)
+          while (batchDataIterator.hasNext(iteratorPredicate)
               && batchDataIterator.currentTime() < curStartTime) {
             batchDataIterator.next();
           }
         } else {
-          while (batchDataIterator.hasNext(curStartTime, curEndTime)
+          while (batchDataIterator.hasNext(iteratorPredicate)
               && batchDataIterator.currentTime() >= curEndTime) {
             batchDataIterator.next();
           }
         }
-        if (batchDataIterator.hasNext(curStartTime, curEndTime)) {
-          result.updateResultFromPageData(batchDataIterator, curStartTime, curEndTime);
+        if (batchDataIterator.hasNext(iteratorPredicate)) {
+          result.updateResultFromPageData(batchDataIterator, iteratorPredicate);
+        }
+        if (ascending) {
+          if (batchData.getReadCurListIndex() > curReadCurListIndex) {
+            curReadCurListIndex = batchData.getReadCurListIndex();
+            curReadCurArrayIndex = batchData.getReadCurArrayIndex();
+          } else if (batchData.getReadCurListIndex() == curReadCurListIndex) {
+            curReadCurArrayIndex = Math.max(batchData.getReadCurArrayIndex(), curReadCurArrayIndex);
+          }
+        } else {
+          if (batchData.getReadCurListIndex() < curReadCurListIndex) {
+            curReadCurListIndex = batchData.getReadCurListIndex();
+            curReadCurArrayIndex = batchData.getReadCurArrayIndex();
+          } else if (batchData.getReadCurListIndex() == curReadCurListIndex) {
+            curReadCurArrayIndex = Math.min(batchData.getReadCurArrayIndex(), curReadCurArrayIndex);
+          }
         }
-        curReadCurArrayIndex =
-            ascending
-                ? Math.max(curReadCurArrayIndex, batchData.getReadCurArrayIndex())
-                : Math.min(curReadCurArrayIndex, batchData.getReadCurArrayIndex());
       }
       // can calc for next interval
       if (!hasCached && batchData.hasCurrent()) {
@@ -331,7 +342,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
 
     // reset the last position to current Index
     lastReadCurArrayIndex = curReadCurArrayIndex;
-    lastReadCurListIndex = batchData.getReadCurListIndex();
+    lastReadCurListIndex = curReadCurListIndex;
     batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index 22563f8823..fd692c410a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.IAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
 
 public class LocalGroupByExecutor implements GroupByExecutor {
 
@@ -55,8 +57,6 @@ public class LocalGroupByExecutor implements GroupByExecutor {
   private int lastReadCurListIndex;
   private final boolean ascending;
 
-  private final QueryDataSource queryDataSource;
-
   public LocalGroupByExecutor(
       PartialPath path,
       Set<String> allSensors,
@@ -65,7 +65,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
       TsFileFilter fileFilter,
       boolean ascending)
       throws StorageEngineException, QueryProcessException {
-    queryDataSource =
+    QueryDataSource queryDataSource =
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter, ascending);
     // update filter by TTL
     timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
@@ -125,6 +125,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
       return;
     }
 
+    Predicate<Long> boundPredicate = QueryUtils.getPredicate(curStartTime, curEndTime, ascending);
     for (AggregateResult result : results) {
       // current agg method has been calculated
       if (result.hasFinalResult()) {
@@ -145,7 +146,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
       }
 
       if (batchIterator.hasNext()) {
-        result.updateResultFromPageData(batchIterator, curStartTime, curEndTime);
+        result.updateResultFromPageData(batchIterator, boundPredicate);
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 6604de1422..15dad4affb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class QueryUtils {
@@ -220,4 +221,12 @@ public class QueryUtils {
         .forEach(item -> orderIndex[index.getAndIncrement()] = item.getKey());
     dataSource.setUnSeqFileOrderIndex(orderIndex);
   }
+
+  /**
+   * @return A predicate used to judge whether the current timestamp is out of time range, returns
+   *     true if it is.
+   */
+  public static Predicate<Long> getPredicate(long minBound, long maxBound, boolean ascending) {
+    return ascending ? time -> time >= maxBound : time -> time < minBound;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
index d038b3467f..cb532b95bd 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/DescAggregateResultTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -72,7 +73,7 @@ public class DescAggregateResultTest {
     maxTimeDescAggrResult.updateResultFromPageData(it);
     Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
     it.reset();
-    maxTimeDescAggrResult.updateResultFromPageData(it, 2, 5);
+    maxTimeDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(2, 5, false));
     Assert.assertEquals(5L, maxTimeDescAggrResult.getResult());
   }
 
@@ -109,7 +110,7 @@ public class DescAggregateResultTest {
     minTimeDescAggrResult.updateResultFromPageData(it);
     Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
     it.reset();
-    minTimeDescAggrResult.updateResultFromPageData(it, 1, 3);
+    minTimeDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(1, 3, false));
     Assert.assertEquals(1L, minTimeDescAggrResult.getResult());
   }
 
@@ -147,7 +148,7 @@ public class DescAggregateResultTest {
     firstValueDescAggrResult.updateResultFromPageData(it);
     Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
     it.reset();
-    firstValueDescAggrResult.updateResultFromPageData(it, 1, 3);
+    firstValueDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(1, 3, false));
     Assert.assertTrue((boolean) firstValueDescAggrResult.getResult());
   }
 
@@ -184,7 +185,7 @@ public class DescAggregateResultTest {
     lastValueDescAggrResult.updateResultFromPageData(it);
     Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
     it.reset();
-    lastValueDescAggrResult.updateResultFromPageData(it, 3, 5);
+    lastValueDescAggrResult.updateResultFromPageData(it, QueryUtils.getPredicate(3, 5, false));
     Assert.assertEquals("e", ((Binary) lastValueDescAggrResult.getResult()).getStringValue());
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 3ff8c5f7ac..7b22ebf7d1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -37,6 +37,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Predicate;
 
 /**
  * <code>BatchData</code> is a self-defined data structure which is optimized for different type of
@@ -808,6 +809,11 @@ public class BatchData {
       return hasNext();
     }
 
+    @Override
+    public boolean hasNext(Predicate<Long> boundPredicate) {
+      return hasNext();
+    }
+
     @Override
     public void next() {
       BatchData.this.next();
@@ -881,6 +887,17 @@ public class BatchData {
       return BatchData.this.hasCurrent();
     }
 
+    @Override
+    public boolean hasNext(Predicate<Long> boundPredicate) {
+      while (BatchData.this.hasCurrent() && currentValue() == null) {
+        if (boundPredicate.test(currentTime())) {
+          break;
+        }
+        super.next();
+      }
+      return BatchData.this.hasCurrent();
+    }
+
     @Override
     public Object currentValue() {
       TsPrimitiveType v = getVector()[subIndex];
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
index 0761c1d00c..ad2ef98765 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IBatchDataIterator.java
@@ -19,12 +19,24 @@
 
 package org.apache.iotdb.tsfile.read.common;
 
+import java.util.function.Predicate;
+
 public interface IBatchDataIterator {
 
   boolean hasNext();
 
   boolean hasNext(long minBound, long maxBound);
 
+  /**
+   * Determine whether there is a non-null value in the current time window. This method is used in
+   * GROUP BY aggregation query.
+   *
+   * @param boundPredicate A predicate used to judge whether the current timestamp is out of time
+   *     range, returns true if it is. This predicate guarantees that the current time of batchData
+   *     will not out of time range when a sensor's values are all null.
+   */
+  boolean hasNext(Predicate<Long> boundPredicate);
+
   void next();
 
   long currentTime();