You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/01 01:04:41 UTC

[iotdb] branch master updated: [IOTDB-1940] Support aggregation query with value filter in new vector (#4470)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f43085e  [IOTDB-1940] Support aggregation query with value filter in new vector (#4470)
f43085e is described below

commit f43085ee9c583e319cc621177aa3e95f14502715
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Dec 1 09:04:08 2021 +0800

    [IOTDB-1940] Support aggregation query with value filter in new vector (#4470)
---
 .../db/query/aggregation/AggregateResult.java      |   4 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |   9 +-
 .../db/query/aggregation/impl/CountAggrResult.java |  11 +-
 .../query/aggregation/impl/ExtremeAggrResult.java  |   7 +-
 .../aggregation/impl/FirstValueAggrResult.java     |  11 +-
 .../aggregation/impl/FirstValueDescAggrResult.java |   7 +-
 .../aggregation/impl/LastValueAggrResult.java      |   7 +-
 .../aggregation/impl/LastValueDescAggrResult.java  |  12 +-
 .../query/aggregation/impl/MaxTimeAggrResult.java  |   5 +-
 .../aggregation/impl/MaxTimeDescAggrResult.java    |  10 +-
 .../query/aggregation/impl/MaxValueAggrResult.java |  10 +-
 .../query/aggregation/impl/MinTimeAggrResult.java  |  10 +-
 .../aggregation/impl/MinTimeDescAggrResult.java    |   5 +-
 .../query/aggregation/impl/MinValueAggrResult.java |  10 +-
 .../db/query/aggregation/impl/SumAggrResult.java   |   9 +-
 .../db/query/executor/AggregationExecutor.java     | 116 ++++++---
 .../iotdb/db/utils/AlignedValueIterator.java       |  58 +++++
 .../org/apache/iotdb/db/utils/ValueIterator.java   |  56 +++++
 .../IoTDBAggregationWithValueFilter2IT.java        |  63 +++++
 .../aligned/IoTDBAggregationWithValueFilterIT.java | 262 +++++++++++++++++++++
 ...BAggregationWithValueFilterWithDeletion2IT.java |  83 +++++++
 ...DBAggregationWithValueFilterWithDeletionIT.java | 219 +++++++++++++++++
 22 files changed, 886 insertions(+), 98 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index efd9aee..7daa1e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -98,7 +99,8 @@ public abstract class AggregateResult {
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
 
   /** This method calculates the aggregation using values that have been calculated */
-  public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+  public abstract void updateResultUsingValues(
+      long[] timestamps, int length, ValueIterator valueIterator);
 
   /**
    * Judge if aggregation results have been calculated. In other words, if the aggregated result
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index a1fffa6..aebe259 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -110,11 +111,9 @@ public class AvgAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateAvg(seriesDataType, values[i]);
-      }
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    while (valueIterator.hasNext()) {
+      updateAvg(seriesDataType, valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 57a3a7e..971c76a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -57,7 +58,6 @@ public class CountAggrResult extends AggregateResult {
   public void updateResultFromPageData(
       IBatchDataIterator batchIterator, long minBound, long maxBound) {
     int cnt = 0;
-    int count = batchIterator.totalLength();
     while (batchIterator.hasNext()) {
       if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
         break;
@@ -82,12 +82,11 @@ public class CountAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     int cnt = 0;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        cnt++;
-      }
+    while (valueIterator.hasNext()) {
+      valueIterator.next();
+      cnt++;
     }
     setLongValue(getLongValue() + cnt);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 144fe00..0a0c165 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -125,10 +126,10 @@ public class ExtremeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> extVal = null;
-    for (int i = 0; i < length; i++) {
-      extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+    while (valueIterator.hasNext()) {
+      extVal = getExtremeValue(extVal, (Comparable<Object>) valueIterator.next());
     }
     updateResult(extVal);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 233abdc..be5bd9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -107,16 +108,12 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        setValue(values[i]);
-        timestamp = timestamps[i];
-        break;
-      }
+    if (valueIterator.hasNext()) {
+      setValue(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 3092818..afcd23f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -65,10 +66,10 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
-        setValue(values[i]);
+      if (valueIterator.get(i) != null) {
+        setValue(valueIterator.get(i));
         timestamp = timestamps[i];
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 443751c..d369f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -97,11 +98,11 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         timestamp = timestamps[i];
-        setValue(values[i]);
+        setValue(valueIterator.get(i));
         return;
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 981167b..7bc236c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -87,16 +88,13 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        timestamp = timestamps[i];
-        setValue(values[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      timestamp = timestamps[valueIterator.getCurPos()];
+      setValue(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 46ebe15..d990b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -77,9 +78,9 @@ public class MaxTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         updateMaxTimeResult(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e867bf7..1dfc1ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -70,15 +71,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateMaxTimeResult(timestamps[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      updateMaxTimeResult(timestamps[valueIterator.getCurPos()]);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index a61583b..f939d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -83,11 +84,12 @@ public class MaxValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> maxVal = null;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
-        maxVal = (Comparable<Object>) values[i];
+    while (valueIterator.hasNext()) {
+      Object value = valueIterator.next();
+      if (maxVal == null || maxVal.compareTo(value) < 0) {
+        maxVal = (Comparable<Object>) value;
       }
     }
     updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 4d0365f..0cf2205 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -92,15 +93,12 @@ public class MinTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        setLongValue(timestamps[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      setLongValue(timestamps[valueIterator.getCurPos()]);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 071d3d2..6fe0fdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -54,9 +55,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         setLongValue(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index f3c01ed..8b9e1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -78,11 +79,12 @@ public class MinValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> minVal = null;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
-        minVal = (Comparable<Object>) values[i];
+    while (valueIterator.hasNext()) {
+      Object value = valueIterator.next();
+      if (minVal == null || minVal.compareTo(value) > 0) {
+        minVal = (Comparable<Object>) value;
       }
     }
     updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index a047c33..46cd1b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
@@ -91,11 +92,9 @@ public class SumAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateSum(values[i]);
-      }
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    while (valueIterator.hasNext()) {
+      updateSum(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 53c218a..1b10d53 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.AlignedValueIterator;
 import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -55,10 +57,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,7 +86,10 @@ public class AggregationExecutor {
   private int aggregateFetchSize;
 
   protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) {
-    this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
+    this.selectedSeries = new ArrayList<>();
+    aggregationPlan
+        .getDeduplicatedPaths()
+        .forEach(k -> selectedSeries.add(((MeasurementPath) k).transformToExactPath()));
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
     this.aggregations = aggregationPlan.getDeduplicatedAggregations();
     this.expression = aggregationPlan.getExpression();
@@ -588,18 +595,26 @@ public class AggregationExecutor {
     // group by path name
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
         groupAggregationsBySeries(selectedSeries);
-    Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
+    Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+        groupVectorSeries(pathToAggrIndexesMap);
+    Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
     List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+
     try {
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
-        if (indexes != null) {
-          IReaderByTimestamp seriesReaderByTimestamp =
-              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
-        }
+      for (PartialPath path : pathToAggrIndexesMap.keySet()) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(path, queryPlan, path.getSeriesType(), context);
+        readerToAggrIndexesMap.put(
+            seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path)));
+      }
+      // assign null to be friendly for GC
+      pathToAggrIndexesMap = null;
+      for (PartialPath vectorPath : vectorPathIndexesMap.keySet()) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context);
+        readerToAggrIndexesMap.put(seriesReaderByTimestamp, vectorPathIndexesMap.get(vectorPath));
       }
+      vectorPathIndexesMap = null;
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
     }
@@ -649,7 +664,7 @@ public class AggregationExecutor {
   /** calculate aggregation result with value filter. */
   private void aggregateWithValueFilter(
       TimeGenerator timestampGenerator,
-      Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
+      Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap)
       throws IOException {
     List<Boolean> cached =
         markFilterdPaths(
@@ -668,23 +683,40 @@ public class AggregationExecutor {
       }
 
       // cal part of aggregate result
-      for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
-        int pathId = entry.getValue().get(0);
-        // cache in timeGenerator
-        if (cached.get(pathId)) {
-          Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
-          for (Integer i : entry.getValue()) {
-            aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+      for (Entry<IReaderByTimestamp, List<List<Integer>>> entry :
+          readerToAggrIndexesMap.entrySet()) {
+        // use cache data as much as possible
+        boolean[] cachedOrNot = new boolean[entry.getValue().size()];
+        for (int i = 0; i < entry.getValue().size(); i++) {
+          List<Integer> subIndexes = entry.getValue().get(i);
+          int pathId = subIndexes.get(0);
+          // if cached in timeGenerator
+          if (cached.get(pathId)) {
+            Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+            ValueIterator valueIterator = generateValueIterator(values);
+            for (Integer index : subIndexes) {
+              aggregateResultList[index].updateResultUsingValues(
+                  timeArray, timeArrayLength, valueIterator);
+              valueIterator.reset();
+            }
+            cachedOrNot[i] = true;
           }
-        } else {
-          if (entry.getValue().size() == 1) {
-            aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
-                timeArray, timeArrayLength, entry.getKey());
-          } else {
-            Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
-            if (values != null) {
-              for (Integer i : entry.getValue()) {
-                aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+        }
+
+        if (hasRemaining(cachedOrNot)) {
+          // TODO: if we only need to get firstValue/minTime that's not need to traverse all values,
+          //  it's enough to get the exact number of values for these specific aggregate func
+          Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+          if (values != null) {
+            ValueIterator valueIterator = generateValueIterator(values);
+            for (int i = 0; i < entry.getValue().size(); i++) {
+              if (!cachedOrNot[i]) {
+                valueIterator.setSubMeasurementIndex(i);
+                for (Integer index : entry.getValue().get(i)) {
+                  aggregateResultList[index].updateResultUsingValues(
+                      timeArray, timeArrayLength, valueIterator);
+                  valueIterator.reset();
+                }
               }
             }
           }
@@ -693,6 +725,24 @@ public class AggregationExecutor {
     }
   }
 
+  private ValueIterator generateValueIterator(Object[] values) {
+    if (values[0] instanceof TsPrimitiveType[]) {
+      return new AlignedValueIterator(values);
+    } else {
+      return new ValueIterator(values);
+    }
+  }
+
+  /** Return whether there is result that has not been cached */
+  private boolean hasRemaining(boolean[] cachedOrNot) {
+    for (int i = 0; i < cachedOrNot.length; i++) {
+      if (!cachedOrNot[i]) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * using aggregate result data list construct QueryDataSet.
    *
@@ -755,21 +805,19 @@ public class AggregationExecutor {
 
     List<PartialPath> seriesPaths = new ArrayList<>(pathToAggrIndexesMap.keySet());
     for (PartialPath seriesPath : seriesPaths) {
-      if (seriesPath instanceof MeasurementPath
-          && ((MeasurementPath) seriesPath).isUnderAlignedEntity()) {
+      if (seriesPath instanceof AlignedPath) {
         List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
-        AlignedPath exactPath = (AlignedPath) ((MeasurementPath) seriesPath).transformToExactPath();
-        AlignedPath groupPath = temp.get(exactPath.getFullPath());
+        AlignedPath groupPath = temp.get(seriesPath.getFullPath());
         if (groupPath == null) {
-          groupPath = exactPath;
+          groupPath = (AlignedPath) seriesPath;
           temp.put(groupPath.getFullPath(), groupPath);
           result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
         } else {
           // groupPath is changed here so we update it
           List<List<Integer>> subIndexes = result.remove(groupPath);
           subIndexes.add(indexes);
-          groupPath.addMeasurements(exactPath.getMeasurementList());
-          groupPath.addSchemas(exactPath.getSchemaList());
+          groupPath.addMeasurements(((AlignedPath) seriesPath).getMeasurementList());
+          groupPath.addSchemas(((AlignedPath) seriesPath).getSchemaList());
           result.put(groupPath, subIndexes);
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
new file mode 100644
index 0000000..5d34d4d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+/** Used for value object is instance of TsPrimitiveType[] */
+public class AlignedValueIterator extends ValueIterator {
+
+  int subMeasurementIndex;
+
+  public AlignedValueIterator(Object[] values) {
+    super(values);
+  }
+
+  public void setSubMeasurementIndex(int subMeasurementIndex) {
+    this.subMeasurementIndex = subMeasurementIndex;
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (curPos < values.length
+        && ((TsPrimitiveType[]) values[curPos])[subMeasurementIndex] == null) {
+      curPos++;
+    }
+    return curPos < values.length;
+  }
+
+  @Override
+  public Object next() {
+    return ((TsPrimitiveType[]) values[curPos++])[subMeasurementIndex].getValue();
+  }
+
+  @Override
+  public Object get(int index) {
+    if (((TsPrimitiveType[]) values[index])[subMeasurementIndex] == null) {
+      return null;
+    }
+    return ((TsPrimitiveType[]) values[index])[subMeasurementIndex].getValue();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
new file mode 100644
index 0000000..d870f7f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+public class ValueIterator {
+
+  // Object: TsPrimitiveType[] or common data type
+  protected Object[] values;
+  protected int curPos = 0;
+
+  public ValueIterator(Object[] values) {
+    this.values = values;
+  }
+
+  public boolean hasNext() {
+    while (curPos < values.length && values[curPos] == null) {
+      curPos++;
+    }
+    return curPos < values.length;
+  }
+
+  public void setSubMeasurementIndex(int subMeasurementIndex) {}
+
+  public Object next() {
+    return values[curPos++];
+  }
+
+  public Object get(int index) {
+    return values[index];
+  }
+
+  public int getCurPos() {
+    return curPos;
+  }
+
+  public void reset() {
+    this.curPos = 0;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilter2IT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilter2IT.java
new file mode 100644
index 0000000..40986d9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilter2IT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/** Let One chunk has more than one page. */
+public class IoTDBAggregationWithValueFilter2IT extends IoTDBAggregationWithValueFilterIT {
+  private static int numOfPointsPerPage;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    // TODO When the aligned time series support compaction, we need to set compaction to true
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+    AlignedWriteUtil.insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    EnvironmentUtils.cleanEnv();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterIT.java
new file mode 100644
index 0000000..75f7882
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterIT.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBAggregationWithValueFilterIT {
+
+  private static final double DELTA = 1e-6;
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    // TODO When the aligned time series support compaction, we need to set compaction to true
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    AlignedWriteUtil.insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void countAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"11"};
+    String[] columnNames = {"count(root.sg1.d1.s4)"};
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select count(s4) from root.sg1.d1 where s4 = true");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void aggregationFuncAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"8", "42.0", "5.25", "1.0", "9.0", "1", "9", "1.0", "9.0"};
+    String[] columnNames = {
+      "count(root.sg1.d1.s1)",
+      "sum(root.sg1.d1.s1)",
+      "avg(root.sg1.d1.s1)",
+      "first_value(root.sg1.d1.s1)",
+      "last_value(root.sg1.d1.s1)",
+      "min_time(root.sg1.d1.s1)",
+      "max_time(root.sg1.d1.s1)",
+      "min_value(root.sg1.d1.s1)",
+      "max_value(root.sg1.d1.s1)",
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), sum(s1), avg(s1), "
+                  + "first_value(s1), last_value(s1), "
+                  + "min_time(s1), max_time(s1),"
+                  + "max_value(s1), min_value(s1) from root.sg1.d1 where s1 < 10");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"6", "6", "9", "11", "6"};
+    String[] columnNames = {
+      "count(root.sg1.d1.s1)",
+      "count(root.sg1.d1.s2)",
+      "count(root.sg1.d1.s3)",
+      "count(root.sg1.d1.s4)",
+      "count(root.sg1.d1.s5)"
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select count(*) from root.sg1.d1 where s4 = true");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void aggregationAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"160016.0", "11", "1", "13"};
+    String[] columnNames = {
+      "sum(root.sg1.d1.s1)",
+      "count(root.sg1.d1.s4)",
+      "min_value(root.sg1.d1.s3)",
+      "max_time(root.sg1.d1.s2)",
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select sum(s1), count(s4), min_value(s3), max_time(s2) from root.sg1.d1 where s4 = true");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletion2IT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletion2IT.java
new file mode 100644
index 0000000..85ddd6b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletion2IT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+/** Let One chunk has more than one page. */
+public class IoTDBAggregationWithValueFilterWithDeletion2IT
+    extends IoTDBAggregationWithValueFilterWithDeletionIT {
+
+  private static int numOfPointsPerPage;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+    AlignedWriteUtil.insertData();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      // TODO currently aligned data in memory doesn't support deletion, so we flush all data to
+      // disk before doing deletion
+      statement.execute("flush");
+      statement.execute("delete timeseries root.sg1.d1.s2");
+      statement.execute("delete from root.sg1.d1.s1 where time <= 21");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    EnvironmentUtils.cleanEnv();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletionIT.java
new file mode 100644
index 0000000..fb9fff0
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletionIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBAggregationWithValueFilterWithDeletionIT {
+
+  private static final double DELTA = 1e-6;
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+
+    AlignedWriteUtil.insertData();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      // TODO currently aligned data in memory doesn't support deletion, so we flush all data to
+      // disk before doing deletion
+      statement.execute("flush");
+      statement.execute("delete timeseries root.sg1.d1.s2");
+      statement.execute("delete from root.sg1.d1.s1 where time <= 21");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void countAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"11"};
+    String[] columnNames = {"count(root.sg1.d1.s4)"};
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select count(s4) from root.sg1.d1 where s4 = true");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"0", "9", "11", "6"};
+    String[] columnNames = {
+      "count(root.sg1.d1.s1)",
+      "count(root.sg1.d1.s3)",
+      "count(root.sg1.d1.s4)",
+      "count(root.sg1.d1.s5)"
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select count(*) from root.sg1.d1 where s4 = true");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void aggregationAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"0", "25", "1"};
+    String[] columnNames = {
+      "count(root.sg1.d1.s1)", "max_time(root.sg1.d1.s4)", "min_value(root.sg1.d1.s3)",
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), max_time(s4), min_value(s3) from root.sg1.d1 where s4 = true");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}