You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/26 01:39:25 UTC

[iotdb] branch aggrWithValueFilter created (now 04996ef)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch aggrWithValueFilter
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 04996ef  reimplement value iterator to update aggr result

This branch includes the following new commits:

     new 04996ef  reimplement value iterator to update aggr result

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: reimplement value iterator to update aggr result

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggrWithValueFilter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04996ef36bed10fd8cd6e01827ec00545df371db
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Nov 25 11:19:58 2021 +0800

    reimplement value iterator to update aggr result
---
 .../db/query/aggregation/AggregateResult.java      |  4 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |  9 +-
 .../db/query/aggregation/impl/CountAggrResult.java | 11 ++-
 .../query/aggregation/impl/ExtremeAggrResult.java  |  7 +-
 .../aggregation/impl/FirstValueAggrResult.java     | 11 +--
 .../aggregation/impl/FirstValueDescAggrResult.java |  7 +-
 .../aggregation/impl/LastValueAggrResult.java      |  7 +-
 .../aggregation/impl/LastValueDescAggrResult.java  | 12 ++-
 .../query/aggregation/impl/MaxTimeAggrResult.java  |  5 +-
 .../aggregation/impl/MaxTimeDescAggrResult.java    | 10 +--
 .../query/aggregation/impl/MaxValueAggrResult.java | 10 ++-
 .../query/aggregation/impl/MinTimeAggrResult.java  | 10 +--
 .../aggregation/impl/MinTimeDescAggrResult.java    |  5 +-
 .../query/aggregation/impl/MinValueAggrResult.java | 10 ++-
 .../db/query/aggregation/impl/SumAggrResult.java   |  9 +-
 .../db/query/executor/AggregationExecutor.java     | 98 ++++++++++++++++------
 .../iotdb/db/utils/AlignedValueIterator.java       | 46 ++++++++++
 .../org/apache/iotdb/db/utils/ValueIterator.java   | 56 +++++++++++++
 18 files changed, 236 insertions(+), 91 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index efd9aee..7daa1e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -98,7 +99,8 @@ public abstract class AggregateResult {
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
 
   /** This method calculates the aggregation using values that have been calculated */
-  public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+  public abstract void updateResultUsingValues(
+      long[] timestamps, int length, ValueIterator valueIterator);
 
   /**
    * Judge if aggregation results have been calculated. In other words, if the aggregated result
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index a1fffa6..aebe259 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -110,11 +111,9 @@ public class AvgAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateAvg(seriesDataType, values[i]);
-      }
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    while (valueIterator.hasNext()) {
+      updateAvg(seriesDataType, valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 57a3a7e..971c76a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -57,7 +58,6 @@ public class CountAggrResult extends AggregateResult {
   public void updateResultFromPageData(
       IBatchDataIterator batchIterator, long minBound, long maxBound) {
     int cnt = 0;
-    int count = batchIterator.totalLength();
     while (batchIterator.hasNext()) {
       if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
         break;
@@ -82,12 +82,11 @@ public class CountAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     int cnt = 0;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        cnt++;
-      }
+    while (valueIterator.hasNext()) {
+      valueIterator.next();
+      cnt++;
     }
     setLongValue(getLongValue() + cnt);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 144fe00..0a0c165 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -125,10 +126,10 @@ public class ExtremeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> extVal = null;
-    for (int i = 0; i < length; i++) {
-      extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+    while (valueIterator.hasNext()) {
+      extVal = getExtremeValue(extVal, (Comparable<Object>) valueIterator.next());
     }
     updateResult(extVal);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 233abdc..be5bd9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -107,16 +108,12 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        setValue(values[i]);
-        timestamp = timestamps[i];
-        break;
-      }
+    if (valueIterator.hasNext()) {
+      setValue(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 3092818..afcd23f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -65,10 +66,10 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
-        setValue(values[i]);
+      if (valueIterator.get(i) != null) {
+        setValue(valueIterator.get(i));
         timestamp = timestamps[i];
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 443751c..d369f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -97,11 +98,11 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         timestamp = timestamps[i];
-        setValue(values[i]);
+        setValue(valueIterator.get(i));
         return;
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 981167b..7bc236c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -87,16 +88,13 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        timestamp = timestamps[i];
-        setValue(values[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      timestamp = timestamps[valueIterator.getCurPos()];
+      setValue(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 46ebe15..d990b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -77,9 +78,9 @@ public class MaxTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         updateMaxTimeResult(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e867bf7..1dfc1ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -70,15 +71,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateMaxTimeResult(timestamps[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      updateMaxTimeResult(timestamps[valueIterator.getCurPos()]);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index a61583b..f939d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -83,11 +84,12 @@ public class MaxValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> maxVal = null;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
-        maxVal = (Comparable<Object>) values[i];
+    while (valueIterator.hasNext()) {
+      Object value = valueIterator.next();
+      if (maxVal == null || maxVal.compareTo(value) < 0) {
+        maxVal = (Comparable<Object>) value;
       }
     }
     updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 4d0365f..0cf2205 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -92,15 +93,12 @@ public class MinTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        setLongValue(timestamps[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      setLongValue(timestamps[valueIterator.getCurPos()]);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 9abceb5..aac888d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -54,9 +55,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         setLongValue(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index f3c01ed..8b9e1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -78,11 +79,12 @@ public class MinValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> minVal = null;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
-        minVal = (Comparable<Object>) values[i];
+    while (valueIterator.hasNext()) {
+      Object value = valueIterator.next();
+      if (minVal == null || minVal.compareTo(value) > 0) {
+        minVal = (Comparable<Object>) value;
       }
     }
     updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index a047c33..46cd1b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
@@ -91,11 +92,9 @@ public class SumAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateSum(values[i]);
-      }
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    while (valueIterator.hasNext()) {
+      updateSum(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 53c218a..94f4f3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.AlignedValueIterator;
 import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -55,10 +57,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -588,18 +592,25 @@ public class AggregationExecutor {
     // group by path name
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
         groupAggregationsBySeries(selectedSeries);
-    Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
+    Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+        groupVectorSeries(pathToAggrIndexesMap);
+    Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
     List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+
     try {
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
-        if (indexes != null) {
-          IReaderByTimestamp seriesReaderByTimestamp =
-              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
-        }
+      for (PartialPath path : pathToAggrIndexesMap.keySet()) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(path, queryPlan, path.getSeriesType(), context);
+        readerToAggrIndexesMap.put(
+            seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path)));
       }
+      pathToAggrIndexesMap.clear();
+      for (PartialPath vectorPath : vectorPathIndexesMap.keySet()) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context);
+        readerToAggrIndexesMap.put(seriesReaderByTimestamp, vectorPathIndexesMap.get(vectorPath));
+      }
+      vectorPathIndexesMap.clear();
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
     }
@@ -649,7 +660,7 @@ public class AggregationExecutor {
   /** calculate aggregation result with value filter. */
   private void aggregateWithValueFilter(
       TimeGenerator timestampGenerator,
-      Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
+      Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap)
       throws IOException {
     List<Boolean> cached =
         markFilterdPaths(
@@ -668,23 +679,38 @@ public class AggregationExecutor {
       }
 
       // cal part of aggregate result
-      for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
-        int pathId = entry.getValue().get(0);
-        // cache in timeGenerator
-        if (cached.get(pathId)) {
-          Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
-          for (Integer i : entry.getValue()) {
-            aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+      for (Entry<IReaderByTimestamp, List<List<Integer>>> entry :
+          readerToAggrIndexesMap.entrySet()) {
+        // use cache data as much as possible
+        boolean[] cachedOrNot = new boolean[entry.getValue().size()];
+        for (int i = 0; i < entry.getValue().size(); i++) {
+          List<Integer> subIndexes = entry.getValue().get(i);
+          int pathId = subIndexes.get(0);
+          // if cached in timeGenerator
+          if (cached.get(pathId)) {
+            // TODO: need to get exact path class?
+            Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+            ValueIterator valueIterator = generateValueIterator(values);
+            for (Integer index : subIndexes) {
+              aggregateResultList[index].updateResultUsingValues(
+                  timeArray, timeArrayLength, valueIterator);
+            }
+            cachedOrNot[i] = true;
           }
-        } else {
-          if (entry.getValue().size() == 1) {
-            aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
-                timeArray, timeArrayLength, entry.getKey());
-          } else {
-            Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
-            if (values != null) {
-              for (Integer i : entry.getValue()) {
-                aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+        }
+        // TODO: if size = 1, we only need to get the exact number of values for specific aggregate
+        if (hasRemaining(cachedOrNot)) {
+          Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+          if (values != null) {
+            ValueIterator valueIterator = generateValueIterator(values);
+            for (int i = 0; i < entry.getValue().size(); i++) {
+              if (!cachedOrNot[i]) {
+                valueIterator.setSubMeasurementIndex(i);
+                for (Integer index : entry.getValue().get(i)) {
+                  aggregateResultList[index].updateResultUsingValues(
+                      timeArray, timeArrayLength, valueIterator);
+                  valueIterator.reset();
+                }
               }
             }
           }
@@ -693,6 +719,24 @@ public class AggregationExecutor {
     }
   }
 
+  private ValueIterator generateValueIterator(Object[] values) {
+    if (values[0] instanceof TsPrimitiveType[]) {
+      return new AlignedValueIterator(values);
+    } else {
+      return new ValueIterator(values);
+    }
+  }
+
+  /** Return whether there is result that has not been cached */
+  private boolean hasRemaining(boolean[] cachedOrNot) {
+    for (int i = 0; i < cachedOrNot.length; i++) {
+      if (!cachedOrNot[i]) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * using aggregate result data list construct QueryDataSet.
    *
@@ -766,7 +810,7 @@ public class AggregationExecutor {
           result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
         } else {
           // groupPath is changed here so we update it
-          List<List<Integer>> subIndexes = result.remove(groupPath);
+          List<List<Integer>> subIndexes = result.get(groupPath);
           subIndexes.add(indexes);
           groupPath.addMeasurements(exactPath.getMeasurementList());
           groupPath.addSchemas(exactPath.getSchemaList());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
new file mode 100644
index 0000000..b47da5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+/** Used for value object is instance of TsPrimitiveType[] */
+public class AlignedValueIterator extends ValueIterator {
+
+  int subMeasurementIndex;
+
+  public AlignedValueIterator(Object[] values) {
+    super(values);
+  }
+
+  public void setSubMeasurementIndex(int subMeasurementIndex) {
+    this.subMeasurementIndex = subMeasurementIndex;
+  }
+
+  @Override
+  public Object next() {
+    return ((TsPrimitiveType[]) values[curPos++])[subMeasurementIndex];
+  }
+
+  @Override
+  public Object get(int index) {
+    return ((TsPrimitiveType[]) values[index])[subMeasurementIndex];
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
new file mode 100644
index 0000000..d870f7f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+public class ValueIterator {
+
+  // Object: TsPrimitiveType[] or common data type
+  protected Object[] values;
+  protected int curPos = 0;
+
+  public ValueIterator(Object[] values) {
+    this.values = values;
+  }
+
+  public boolean hasNext() {
+    while (curPos < values.length && values[curPos] == null) {
+      curPos++;
+    }
+    return curPos < values.length;
+  }
+
+  public void setSubMeasurementIndex(int subMeasurementIndex) {}
+
+  public Object next() {
+    return values[curPos++];
+  }
+
+  public Object get(int index) {
+    return values[index];
+  }
+
+  public int getCurPos() {
+    return curPos;
+  }
+
+  public void reset() {
+    this.curPos = 0;
+  }
+}