You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/15 10:14:25 UTC
[iotdb] branch clusterQueryOpt updated: Improve aggregation with
value filter:
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
new d3a4d78 Improve aggregation with value filter:
d3a4d78 is described below
commit d3a4d78a95c8b398782a1c73b94c2fa4e8cde0e9
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Mar 15 18:13:55 2021 +0800
Improve aggregation with value filter:
---
.../db/query/aggregation/AggregateResult.java | 3 ++
.../db/query/aggregation/impl/AvgAggrResult.java | 9 +++++
.../db/query/aggregation/impl/CountAggrResult.java | 14 ++++++++
.../aggregation/impl/FirstValueAggrResult.java | 14 ++++++++
.../aggregation/impl/FirstValueDescAggrResult.java | 11 +++++++
.../aggregation/impl/LastValueAggrResult.java | 11 +++++++
.../aggregation/impl/LastValueDescAggrResult.java | 14 ++++++++
.../query/aggregation/impl/MaxTimeAggrResult.java | 10 ++++++
.../aggregation/impl/MaxTimeDescAggrResult.java | 13 ++++++++
.../query/aggregation/impl/MaxValueAggrResult.java | 14 ++++++--
.../query/aggregation/impl/MinTimeAggrResult.java | 13 ++++++++
.../aggregation/impl/MinTimeDescAggrResult.java | 10 ++++++
.../query/aggregation/impl/MinValueAggrResult.java | 14 ++++++--
.../db/query/aggregation/impl/SumAggrResult.java | 9 +++++
.../db/query/executor/AggregationExecutor.java | 38 ++++++++++++++--------
15 files changed, 177 insertions(+), 20 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7b94a93..226bb42 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -94,6 +94,9 @@ public abstract class AggregateResult {
public abstract void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
+ /** This method calculates the aggregation using values that have been calculated */
+ public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+
/**
* Judge if aggregation results have been calculated. In other words, if the aggregated result
* does not need to compute the remaining data, it returns true.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 6e068b8..23842cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -110,6 +110,15 @@ public class AvgAggrResult extends AggregateResult {
}
}
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ updateAvg(seriesDataType, values[i]);
+ }
+ }
+ }
+
private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException {
double val;
switch (type) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 30b4659..7b40fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -87,6 +87,20 @@ public class CountAggrResult extends AggregateResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ int cnt = 0;
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ cnt++;
+ }
+ }
+
+ long preValue = getLongValue();
+ preValue += cnt;
+ setLongValue(preValue);
+ }
+
+ @Override
public boolean hasFinalResult() {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 6703c7e..f7576a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -107,6 +107,20 @@ public class FirstValueAggrResult extends AggregateResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ setValue(values[i]);
+ timestamp = timestamps[i];
+ break;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return hasCandidateResult;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 91d71a6..8eae923 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -64,6 +64,17 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ setValue(values[i]);
+ timestamp = timestamps[i];
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 04cb67e..ad06ace 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -96,6 +96,17 @@ public class LastValueAggrResult extends AggregateResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ timestamp = timestamps[i];
+ setValue(values[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index d236eab..da2a953 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -81,6 +81,20 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ timestamp = timestamps[i];
+ setValue(values[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return hasCandidateResult;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 321bc78..fad90de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -76,6 +76,16 @@ public class MaxTimeAggrResult extends AggregateResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ updateMaxTimeResult(timestamps[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index a5c4e47..148503d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -64,6 +64,19 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ updateMaxTimeResult(timestamps[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return hasCandidateResult;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 8f59994..34d9622 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -74,10 +74,18 @@ public class MaxValueAggrResult extends AggregateResult {
Comparable<Object> maxVal = null;
Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- if (values[i] == null) {
- continue;
+ if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
+ maxVal = (Comparable<Object>) values[i];
}
- if (maxVal == null || maxVal.compareTo(values[i]) < 0) {
+ }
+ updateResult(maxVal);
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ Comparable<Object> maxVal = null;
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
maxVal = (Comparable<Object>) values[i];
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 3864c08..1a63ee1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -86,6 +86,19 @@ public class MinTimeAggrResult extends AggregateResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ if (hasFinalResult()) {
+ return;
+ }
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ setLongValue(timestamps[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return hasCandidateResult;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 2e65be3..6b45f14 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -53,6 +53,16 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
}
@Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = length - 1; i >= 0; i--) {
+ if (values[i] != null) {
+ setLongValue(timestamps[i]);
+ return;
+ }
+ }
+ }
+
+ @Override
public boolean hasFinalResult() {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 761fbac..eefbbb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -69,10 +69,18 @@ public class MinValueAggrResult extends AggregateResult {
Comparable<Object> minVal = null;
Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- if (values[i] == null) {
- continue;
+ if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
+ minVal = (Comparable<Object>) values[i];
}
- if (minVal == null || minVal.compareTo(values[i]) > 0) {
+ }
+ updateResult(minVal);
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ Comparable<Object> minVal = null;
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
minVal = (Comparable<Object>) values[i];
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index d441d76..8a11502 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -88,6 +88,15 @@ public class SumAggrResult extends AggregateResult {
}
}
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ for (int i = 0; i < length; i++) {
+ if (values[i] != null) {
+ updateSum(values[i]);
+ }
+ }
+ }
+
private void updateSum(Object sumVal) throws UnSupportedDataTypeException {
double preValue = getDoubleValue();
switch (seriesDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 4c54ac9..0b0d010 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -58,6 +58,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
@SuppressWarnings("java:S1135") // ignore todos
@@ -349,14 +350,20 @@ public class AggregationExecutor {
this.ascending = false;
}
TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
- List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
+ // group by path name
+ Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
+ groupAggregationsBySeries(selectedSeries);
+ Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
try {
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath path = selectedSeries.get(i);
- IReaderByTimestamp seriesReaderByTimestamp =
- getReaderByTime(path, queryPlan, dataTypes.get(i), context);
- readersOfSelectedSeries.add(seriesReaderByTimestamp);
+ List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+ if (indexes != null) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+ readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+ }
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -369,7 +376,7 @@ public class AggregationExecutor {
AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending);
aggregateResults.add(result);
}
- aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
+ aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
return constructDataSet(aggregateResults, queryPlan);
}
@@ -395,7 +402,7 @@ public class AggregationExecutor {
private void aggregateWithValueFilter(
List<AggregateResult> aggregateResults,
TimeGenerator timestampGenerator,
- List<IReaderByTimestamp> readersOfSelectedSeries)
+ Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
throws IOException {
while (timestampGenerator.hasNext()) {
@@ -411,11 +418,16 @@ public class AggregationExecutor {
}
// cal part of aggregate result
- for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
- aggregateResults
- .get(i)
- .updateResultUsingTimestamps(
- timeArray, timeArrayLength, readersOfSelectedSeries.get(i));
+ for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
+ if (entry.getValue().size() == 1) {
+ aggregateResults
+ .get(entry.getValue().get(0))
+ .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+ } else {
+ Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+ for (Integer i : entry.getValue())
+ aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
}
}
}
@@ -474,9 +486,7 @@ public class AggregationExecutor {
Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath series = selectedSeries.get(i);
- List<Integer> indexList =
- pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>());
- indexList.add(i);
+ pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>()).add(i);
}
return pathToAggrIndexesMap;
}