You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/26 01:39:26 UTC

[iotdb] 01/01: reimplement value iterator to update aggr result

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggrWithValueFilter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04996ef36bed10fd8cd6e01827ec00545df371db
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Nov 25 11:19:58 2021 +0800

    reimplement value iterator to update aggr result
---
 .../db/query/aggregation/AggregateResult.java      |  4 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |  9 +-
 .../db/query/aggregation/impl/CountAggrResult.java | 11 ++-
 .../query/aggregation/impl/ExtremeAggrResult.java  |  7 +-
 .../aggregation/impl/FirstValueAggrResult.java     | 11 +--
 .../aggregation/impl/FirstValueDescAggrResult.java |  7 +-
 .../aggregation/impl/LastValueAggrResult.java      |  7 +-
 .../aggregation/impl/LastValueDescAggrResult.java  | 12 ++-
 .../query/aggregation/impl/MaxTimeAggrResult.java  |  5 +-
 .../aggregation/impl/MaxTimeDescAggrResult.java    | 10 +--
 .../query/aggregation/impl/MaxValueAggrResult.java | 10 ++-
 .../query/aggregation/impl/MinTimeAggrResult.java  | 10 +--
 .../aggregation/impl/MinTimeDescAggrResult.java    |  5 +-
 .../query/aggregation/impl/MinValueAggrResult.java | 10 ++-
 .../db/query/aggregation/impl/SumAggrResult.java   |  9 +-
 .../db/query/executor/AggregationExecutor.java     | 98 ++++++++++++++++------
 .../iotdb/db/utils/AlignedValueIterator.java       | 46 ++++++++++
 .../org/apache/iotdb/db/utils/ValueIterator.java   | 56 +++++++++++++
 18 files changed, 236 insertions(+), 91 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index efd9aee..7daa1e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -98,7 +99,8 @@ public abstract class AggregateResult {
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
 
   /** This method calculates the aggregation using values that have been calculated */
-  public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+  public abstract void updateResultUsingValues(
+      long[] timestamps, int length, ValueIterator valueIterator);
 
   /**
    * Judge if aggregation results have been calculated. In other words, if the aggregated result
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index a1fffa6..aebe259 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -110,11 +111,9 @@ public class AvgAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateAvg(seriesDataType, values[i]);
-      }
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    while (valueIterator.hasNext()) {
+      updateAvg(seriesDataType, valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 57a3a7e..971c76a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -57,7 +58,6 @@ public class CountAggrResult extends AggregateResult {
   public void updateResultFromPageData(
       IBatchDataIterator batchIterator, long minBound, long maxBound) {
     int cnt = 0;
-    int count = batchIterator.totalLength();
     while (batchIterator.hasNext()) {
       if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
         break;
@@ -82,12 +82,11 @@ public class CountAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     int cnt = 0;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        cnt++;
-      }
+    while (valueIterator.hasNext()) {
+      valueIterator.next();
+      cnt++;
     }
     setLongValue(getLongValue() + cnt);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 144fe00..0a0c165 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -125,10 +126,10 @@ public class ExtremeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> extVal = null;
-    for (int i = 0; i < length; i++) {
-      extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+    while (valueIterator.hasNext()) {
+      extVal = getExtremeValue(extVal, (Comparable<Object>) valueIterator.next());
     }
     updateResult(extVal);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 233abdc..be5bd9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -107,16 +108,12 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        setValue(values[i]);
-        timestamp = timestamps[i];
-        break;
-      }
+    if (valueIterator.hasNext()) {
+      setValue(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 3092818..afcd23f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -65,10 +66,10 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
-        setValue(values[i]);
+      if (valueIterator.get(i) != null) {
+        setValue(valueIterator.get(i));
         timestamp = timestamps[i];
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 443751c..d369f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -97,11 +98,11 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         timestamp = timestamps[i];
-        setValue(values[i]);
+        setValue(valueIterator.get(i));
         return;
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 981167b..7bc236c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -87,16 +88,13 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        timestamp = timestamps[i];
-        setValue(values[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      timestamp = timestamps[valueIterator.getCurPos()];
+      setValue(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 46ebe15..d990b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -77,9 +78,9 @@ public class MaxTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         updateMaxTimeResult(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e867bf7..1dfc1ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -70,15 +71,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateMaxTimeResult(timestamps[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      updateMaxTimeResult(timestamps[valueIterator.getCurPos()]);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index a61583b..f939d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -83,11 +84,12 @@ public class MaxValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> maxVal = null;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
-        maxVal = (Comparable<Object>) values[i];
+    while (valueIterator.hasNext()) {
+      Object value = valueIterator.next();
+      if (maxVal == null || maxVal.compareTo(value) < 0) {
+        maxVal = (Comparable<Object>) value;
       }
     }
     updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 4d0365f..0cf2205 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -92,15 +93,12 @@ public class MinTimeAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     if (hasFinalResult()) {
       return;
     }
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        setLongValue(timestamps[i]);
-        return;
-      }
+    if (valueIterator.hasNext()) {
+      setLongValue(timestamps[valueIterator.getCurPos()]);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 9abceb5..aac888d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.aggregation.impl;
 
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 
@@ -54,9 +55,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     for (int i = length - 1; i >= 0; i--) {
-      if (values[i] != null) {
+      if (valueIterator.get(i) != null) {
         setLongValue(timestamps[i]);
         return;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index f3c01ed..8b9e1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -78,11 +79,12 @@ public class MinValueAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
     Comparable<Object> minVal = null;
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
-        minVal = (Comparable<Object>) values[i];
+    while (valueIterator.hasNext()) {
+      Object value = valueIterator.next();
+      if (minVal == null || minVal.compareTo(value) > 0) {
+        minVal = (Comparable<Object>) value;
       }
     }
     updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index a047c33..46cd1b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
@@ -91,11 +92,9 @@ public class SumAggrResult extends AggregateResult {
   }
 
   @Override
-  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
-    for (int i = 0; i < length; i++) {
-      if (values[i] != null) {
-        updateSum(values[i]);
-      }
+  public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+    while (valueIterator.hasNext()) {
+      updateSum(valueIterator.next());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 53c218a..94f4f3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.AlignedValueIterator;
 import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.ValueIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -55,10 +57,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -588,18 +592,25 @@ public class AggregationExecutor {
     // group by path name
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
         groupAggregationsBySeries(selectedSeries);
-    Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
+    Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+        groupVectorSeries(pathToAggrIndexesMap);
+    Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
     List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+
     try {
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
-        if (indexes != null) {
-          IReaderByTimestamp seriesReaderByTimestamp =
-              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
-        }
+      for (PartialPath path : pathToAggrIndexesMap.keySet()) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(path, queryPlan, path.getSeriesType(), context);
+        readerToAggrIndexesMap.put(
+            seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path)));
       }
+      pathToAggrIndexesMap.clear();
+      for (PartialPath vectorPath : vectorPathIndexesMap.keySet()) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context);
+        readerToAggrIndexesMap.put(seriesReaderByTimestamp, vectorPathIndexesMap.get(vectorPath));
+      }
+      vectorPathIndexesMap.clear();
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
     }
@@ -649,7 +660,7 @@ public class AggregationExecutor {
   /** calculate aggregation result with value filter. */
   private void aggregateWithValueFilter(
       TimeGenerator timestampGenerator,
-      Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
+      Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap)
       throws IOException {
     List<Boolean> cached =
         markFilterdPaths(
@@ -668,23 +679,38 @@ public class AggregationExecutor {
       }
 
       // cal part of aggregate result
-      for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
-        int pathId = entry.getValue().get(0);
-        // cache in timeGenerator
-        if (cached.get(pathId)) {
-          Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
-          for (Integer i : entry.getValue()) {
-            aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+      for (Entry<IReaderByTimestamp, List<List<Integer>>> entry :
+          readerToAggrIndexesMap.entrySet()) {
+        // use cache data as much as possible
+        boolean[] cachedOrNot = new boolean[entry.getValue().size()];
+        for (int i = 0; i < entry.getValue().size(); i++) {
+          List<Integer> subIndexes = entry.getValue().get(i);
+          int pathId = subIndexes.get(0);
+          // if cached in timeGenerator
+          if (cached.get(pathId)) {
+            // TODO: need to get exact path class?
+            Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+            ValueIterator valueIterator = generateValueIterator(values);
+            for (Integer index : subIndexes) {
+              aggregateResultList[index].updateResultUsingValues(
+                  timeArray, timeArrayLength, valueIterator);
+            }
+            cachedOrNot[i] = true;
           }
-        } else {
-          if (entry.getValue().size() == 1) {
-            aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
-                timeArray, timeArrayLength, entry.getKey());
-          } else {
-            Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
-            if (values != null) {
-              for (Integer i : entry.getValue()) {
-                aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+        }
+        // TODO: if size = 1, we only need to get the exact number of values for specific aggregate
+        if (hasRemaining(cachedOrNot)) {
+          Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+          if (values != null) {
+            ValueIterator valueIterator = generateValueIterator(values);
+            for (int i = 0; i < entry.getValue().size(); i++) {
+              if (!cachedOrNot[i]) {
+                valueIterator.setSubMeasurementIndex(i);
+                for (Integer index : entry.getValue().get(i)) {
+                  aggregateResultList[index].updateResultUsingValues(
+                      timeArray, timeArrayLength, valueIterator);
+                  valueIterator.reset();
+                }
               }
             }
           }
@@ -693,6 +719,24 @@ public class AggregationExecutor {
     }
   }
 
+  private ValueIterator generateValueIterator(Object[] values) {
+    if (values[0] instanceof TsPrimitiveType[]) {
+      return new AlignedValueIterator(values);
+    } else {
+      return new ValueIterator(values);
+    }
+  }
+
+  /** Return whether there is result that has not been cached */
+  private boolean hasRemaining(boolean[] cachedOrNot) {
+    for (int i = 0; i < cachedOrNot.length; i++) {
+      if (!cachedOrNot[i]) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * using aggregate result data list construct QueryDataSet.
    *
@@ -766,7 +810,7 @@ public class AggregationExecutor {
           result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
         } else {
           // groupPath is changed here so we update it
-          List<List<Integer>> subIndexes = result.remove(groupPath);
+          List<List<Integer>> subIndexes = result.get(groupPath);
           subIndexes.add(indexes);
           groupPath.addMeasurements(exactPath.getMeasurementList());
           groupPath.addSchemas(exactPath.getSchemaList());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
new file mode 100644
index 0000000..b47da5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+/** Used for value object is instance of TsPrimitiveType[] */
+public class AlignedValueIterator extends ValueIterator {
+
+  int subMeasurementIndex;
+
+  public AlignedValueIterator(Object[] values) {
+    super(values);
+  }
+
+  public void setSubMeasurementIndex(int subMeasurementIndex) {
+    this.subMeasurementIndex = subMeasurementIndex;
+  }
+
+  @Override
+  public Object next() {
+    return ((TsPrimitiveType[]) values[curPos++])[subMeasurementIndex];
+  }
+
+  @Override
+  public Object get(int index) {
+    return ((TsPrimitiveType[]) values[index])[subMeasurementIndex];
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
new file mode 100644
index 0000000..d870f7f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+public class ValueIterator {
+
+  // Object: TsPrimitiveType[] or common data type
+  protected Object[] values;
+  protected int curPos = 0;
+
+  public ValueIterator(Object[] values) {
+    this.values = values;
+  }
+
+  public boolean hasNext() {
+    while (curPos < values.length && values[curPos] == null) {
+      curPos++;
+    }
+    return curPos < values.length;
+  }
+
+  public void setSubMeasurementIndex(int subMeasurementIndex) {}
+
+  public Object next() {
+    return values[curPos++];
+  }
+
+  public Object get(int index) {
+    return values[index];
+  }
+
+  public int getCurPos() {
+    return curPos;
+  }
+
+  public void reset() {
+    this.curPos = 0;
+  }
+}