You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/26 01:39:26 UTC
[iotdb] 01/01: reimplement value iterator to update aggr result
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch aggrWithValueFilter
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04996ef36bed10fd8cd6e01827ec00545df371db
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Nov 25 11:19:58 2021 +0800
reimplement value iterator to update aggr result
---
.../db/query/aggregation/AggregateResult.java | 4 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 9 +-
.../db/query/aggregation/impl/CountAggrResult.java | 11 ++-
.../query/aggregation/impl/ExtremeAggrResult.java | 7 +-
.../aggregation/impl/FirstValueAggrResult.java | 11 +--
.../aggregation/impl/FirstValueDescAggrResult.java | 7 +-
.../aggregation/impl/LastValueAggrResult.java | 7 +-
.../aggregation/impl/LastValueDescAggrResult.java | 12 ++-
.../query/aggregation/impl/MaxTimeAggrResult.java | 5 +-
.../aggregation/impl/MaxTimeDescAggrResult.java | 10 +--
.../query/aggregation/impl/MaxValueAggrResult.java | 10 ++-
.../query/aggregation/impl/MinTimeAggrResult.java | 10 +--
.../aggregation/impl/MinTimeDescAggrResult.java | 5 +-
.../query/aggregation/impl/MinValueAggrResult.java | 10 ++-
.../db/query/aggregation/impl/SumAggrResult.java | 9 +-
.../db/query/executor/AggregationExecutor.java | 98 ++++++++++++++++------
.../iotdb/db/utils/AlignedValueIterator.java | 46 ++++++++++
.../org/apache/iotdb/db/utils/ValueIterator.java | 56 +++++++++++++
18 files changed, 236 insertions(+), 91 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index efd9aee..7daa1e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -98,7 +99,8 @@ public abstract class AggregateResult {
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
/** This method calculates the aggregation using values that have been calculated */
- public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+ public abstract void updateResultUsingValues(
+ long[] timestamps, int length, ValueIterator valueIterator);
/**
* Judge if aggregation results have been calculated. In other words, if the aggregated result
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index a1fffa6..aebe259 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -110,11 +111,9 @@ public class AvgAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- updateAvg(seriesDataType, values[i]);
- }
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ while (valueIterator.hasNext()) {
+ updateAvg(seriesDataType, valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 57a3a7e..971c76a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -57,7 +58,6 @@ public class CountAggrResult extends AggregateResult {
public void updateResultFromPageData(
IBatchDataIterator batchIterator, long minBound, long maxBound) {
int cnt = 0;
- int count = batchIterator.totalLength();
while (batchIterator.hasNext()) {
if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
@@ -82,12 +82,11 @@ public class CountAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
int cnt = 0;
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- cnt++;
- }
+ while (valueIterator.hasNext()) {
+ valueIterator.next();
+ cnt++;
}
setLongValue(getLongValue() + cnt);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 144fe00..0a0c165 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -125,10 +126,10 @@ public class ExtremeAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
Comparable<Object> extVal = null;
- for (int i = 0; i < length; i++) {
- extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+ while (valueIterator.hasNext()) {
+ extVal = getExtremeValue(extVal, (Comparable<Object>) valueIterator.next());
}
updateResult(extVal);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 233abdc..be5bd9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -107,16 +108,12 @@ public class FirstValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- setValue(values[i]);
- timestamp = timestamps[i];
- break;
- }
+ if (valueIterator.hasNext()) {
+ setValue(valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 3092818..afcd23f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -65,10 +66,10 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
- setValue(values[i]);
+ if (valueIterator.get(i) != null) {
+ setValue(valueIterator.get(i));
timestamp = timestamps[i];
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 443751c..d369f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -97,11 +98,11 @@ public class LastValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
+ if (valueIterator.get(i) != null) {
timestamp = timestamps[i];
- setValue(values[i]);
+ setValue(valueIterator.get(i));
return;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 981167b..7bc236c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -87,16 +88,13 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- timestamp = timestamps[i];
- setValue(values[i]);
- return;
- }
+ if (valueIterator.hasNext()) {
+ timestamp = timestamps[valueIterator.getCurPos()];
+ setValue(valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 46ebe15..d990b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -77,9 +78,9 @@ public class MaxTimeAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
+ if (valueIterator.get(i) != null) {
updateMaxTimeResult(timestamps[i]);
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e867bf7..1dfc1ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -70,15 +71,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- updateMaxTimeResult(timestamps[i]);
- return;
- }
+ if (valueIterator.hasNext()) {
+ updateMaxTimeResult(timestamps[valueIterator.getCurPos()]);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index a61583b..f939d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -83,11 +84,12 @@ public class MaxValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
Comparable<Object> maxVal = null;
- for (int i = 0; i < length; i++) {
- if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
- maxVal = (Comparable<Object>) values[i];
+ while (valueIterator.hasNext()) {
+ Object value = valueIterator.next();
+ if (maxVal == null || maxVal.compareTo(value) < 0) {
+ maxVal = (Comparable<Object>) value;
}
}
updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 4d0365f..0cf2205 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -92,15 +93,12 @@ public class MinTimeAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- setLongValue(timestamps[i]);
- return;
- }
+ if (valueIterator.hasNext()) {
+ setLongValue(timestamps[valueIterator.getCurPos()]);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 9abceb5..aac888d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -54,9 +55,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
+ if (valueIterator.get(i) != null) {
setLongValue(timestamps[i]);
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index f3c01ed..8b9e1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -78,11 +79,12 @@ public class MinValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
Comparable<Object> minVal = null;
- for (int i = 0; i < length; i++) {
- if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
- minVal = (Comparable<Object>) values[i];
+ while (valueIterator.hasNext()) {
+ Object value = valueIterator.next();
+ if (minVal == null || minVal.compareTo(value) > 0) {
+ minVal = (Comparable<Object>) value;
}
}
updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index a047c33..46cd1b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
@@ -91,11 +92,9 @@ public class SumAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- updateSum(values[i]);
- }
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ while (valueIterator.hasNext()) {
+ updateSum(valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 53c218a..94f4f3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.AlignedValueIterator;
import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -55,10 +57,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -588,18 +592,25 @@ public class AggregationExecutor {
// group by path name
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
- Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
+ Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+ groupVectorSeries(pathToAggrIndexesMap);
+ Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+
try {
- for (int i = 0; i < selectedSeries.size(); i++) {
- PartialPath path = selectedSeries.get(i);
- List<Integer> indexes = pathToAggrIndexesMap.remove(path);
- if (indexes != null) {
- IReaderByTimestamp seriesReaderByTimestamp =
- getReaderByTime(path, queryPlan, dataTypes.get(i), context);
- readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
- }
+ for (PartialPath path : pathToAggrIndexesMap.keySet()) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(path, queryPlan, path.getSeriesType(), context);
+ readerToAggrIndexesMap.put(
+ seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path)));
}
+ pathToAggrIndexesMap.clear();
+ for (PartialPath vectorPath : vectorPathIndexesMap.keySet()) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context);
+ readerToAggrIndexesMap.put(seriesReaderByTimestamp, vectorPathIndexesMap.get(vectorPath));
+ }
+ vectorPathIndexesMap.clear();
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
@@ -649,7 +660,7 @@ public class AggregationExecutor {
/** calculate aggregation result with value filter. */
private void aggregateWithValueFilter(
TimeGenerator timestampGenerator,
- Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
+ Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap)
throws IOException {
List<Boolean> cached =
markFilterdPaths(
@@ -668,23 +679,38 @@ public class AggregationExecutor {
}
// cal part of aggregate result
- for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
- int pathId = entry.getValue().get(0);
- // cache in timeGenerator
- if (cached.get(pathId)) {
- Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
- for (Integer i : entry.getValue()) {
- aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+ for (Entry<IReaderByTimestamp, List<List<Integer>>> entry :
+ readerToAggrIndexesMap.entrySet()) {
+ // use cache data as much as possible
+ boolean[] cachedOrNot = new boolean[entry.getValue().size()];
+ for (int i = 0; i < entry.getValue().size(); i++) {
+ List<Integer> subIndexes = entry.getValue().get(i);
+ int pathId = subIndexes.get(0);
+ // if cached in timeGenerator
+ if (cached.get(pathId)) {
+ // TODO: need to get exact path class?
+ Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+ ValueIterator valueIterator = generateValueIterator(values);
+ for (Integer index : subIndexes) {
+ aggregateResultList[index].updateResultUsingValues(
+ timeArray, timeArrayLength, valueIterator);
+ }
+ cachedOrNot[i] = true;
}
- } else {
- if (entry.getValue().size() == 1) {
- aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
- timeArray, timeArrayLength, entry.getKey());
- } else {
- Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
- if (values != null) {
- for (Integer i : entry.getValue()) {
- aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
+ // TODO: if size = 1, we only need to get the exact number of values for specific aggregate
+ if (hasRemaining(cachedOrNot)) {
+ Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+ if (values != null) {
+ ValueIterator valueIterator = generateValueIterator(values);
+ for (int i = 0; i < entry.getValue().size(); i++) {
+ if (!cachedOrNot[i]) {
+ valueIterator.setSubMeasurementIndex(i);
+ for (Integer index : entry.getValue().get(i)) {
+ aggregateResultList[index].updateResultUsingValues(
+ timeArray, timeArrayLength, valueIterator);
+ valueIterator.reset();
+ }
}
}
}
@@ -693,6 +719,24 @@ public class AggregationExecutor {
}
}
+ private ValueIterator generateValueIterator(Object[] values) {
+ if (values[0] instanceof TsPrimitiveType[]) {
+ return new AlignedValueIterator(values);
+ } else {
+ return new ValueIterator(values);
+ }
+ }
+
+ /** Return whether there is result that has not been cached */
+ private boolean hasRemaining(boolean[] cachedOrNot) {
+ for (int i = 0; i < cachedOrNot.length; i++) {
+ if (!cachedOrNot[i]) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* using aggregate result data list construct QueryDataSet.
*
@@ -766,7 +810,7 @@ public class AggregationExecutor {
result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
} else {
// groupPath is changed here so we update it
- List<List<Integer>> subIndexes = result.remove(groupPath);
+ List<List<Integer>> subIndexes = result.get(groupPath);
subIndexes.add(indexes);
groupPath.addMeasurements(exactPath.getMeasurementList());
groupPath.addSchemas(exactPath.getSchemaList());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
new file mode 100644
index 0000000..b47da5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+/** Used for value object is instance of TsPrimitiveType[] */
+public class AlignedValueIterator extends ValueIterator {
+
+ int subMeasurementIndex;
+
+ public AlignedValueIterator(Object[] values) {
+ super(values);
+ }
+
+ public void setSubMeasurementIndex(int subMeasurementIndex) {
+ this.subMeasurementIndex = subMeasurementIndex;
+ }
+
+ @Override
+ public Object next() {
+ return ((TsPrimitiveType[]) values[curPos++])[subMeasurementIndex];
+ }
+
+ @Override
+ public Object get(int index) {
+ return ((TsPrimitiveType[]) values[index])[subMeasurementIndex];
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
new file mode 100644
index 0000000..d870f7f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+public class ValueIterator {
+
+ // Object: TsPrimitiveType[] or common data type
+ protected Object[] values;
+ protected int curPos = 0;
+
+ public ValueIterator(Object[] values) {
+ this.values = values;
+ }
+
+ public boolean hasNext() {
+ while (curPos < values.length && values[curPos] == null) {
+ curPos++;
+ }
+ return curPos < values.length;
+ }
+
+ public void setSubMeasurementIndex(int subMeasurementIndex) {}
+
+ public Object next() {
+ return values[curPos++];
+ }
+
+ public Object get(int index) {
+ return values[index];
+ }
+
+ public int getCurPos() {
+ return curPos;
+ }
+
+ public void reset() {
+ this.curPos = 0;
+ }
+}