You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/15 10:14:25 UTC

[iotdb] branch clusterQueryOpt updated: Improve aggregation with value filter:

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
     new d3a4d78  Improve aggregation with value filter:
d3a4d78 is described below

commit d3a4d78a95c8b398782a1c73b94c2fa4e8cde0e9
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon Mar 15 18:13:55 2021 +0800

    Improve aggregation with value filter:
---
 .../db/query/aggregation/AggregateResult.java      |  3 ++
 .../db/query/aggregation/impl/AvgAggrResult.java   |  9 +++++
 .../db/query/aggregation/impl/CountAggrResult.java | 14 ++++++++
 .../aggregation/impl/FirstValueAggrResult.java     | 14 ++++++++
 .../aggregation/impl/FirstValueDescAggrResult.java | 11 +++++++
 .../aggregation/impl/LastValueAggrResult.java      | 11 +++++++
 .../aggregation/impl/LastValueDescAggrResult.java  | 14 ++++++++
 .../query/aggregation/impl/MaxTimeAggrResult.java  | 10 ++++++
 .../aggregation/impl/MaxTimeDescAggrResult.java    | 13 ++++++++
 .../query/aggregation/impl/MaxValueAggrResult.java | 14 ++++++--
 .../query/aggregation/impl/MinTimeAggrResult.java  | 13 ++++++++
 .../aggregation/impl/MinTimeDescAggrResult.java    | 10 ++++++
 .../query/aggregation/impl/MinValueAggrResult.java | 14 ++++++--
 .../db/query/aggregation/impl/SumAggrResult.java   |  9 +++++
 .../db/query/executor/AggregationExecutor.java     | 38 ++++++++++++++--------
 15 files changed, 177 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 7b94a93..226bb42 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -94,6 +94,9 @@ public abstract class AggregateResult {
   public abstract void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
 
+  /** This method calculates the aggregation using values that have been calculated */
+  public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+
   /**
    * Judge if aggregation results have been calculated. In other words, if the aggregated result
    * does not need to compute the remaining data, it returns true.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 6e068b8..23842cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -110,6 +110,15 @@ public class AvgAggrResult extends AggregateResult {
     }
   }
 
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        updateAvg(seriesDataType, values[i]);
+      }
+    }
+  }
+
   private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException {
     double val;
     switch (type) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 30b4659..7b40fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -87,6 +87,20 @@ public class CountAggrResult extends AggregateResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    int cnt = 0;
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        cnt++;
+      }
+    }
+
+    long preValue = getLongValue();
+    preValue += cnt;
+    setLongValue(preValue);
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 6703c7e..f7576a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -107,6 +107,20 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        setValue(values[i]);
+        timestamp = timestamps[i];
+        break;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return hasCandidateResult;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 91d71a6..8eae923 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -64,6 +64,17 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        setValue(values[i]);
+        timestamp = timestamps[i];
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 04cb67e..ad06ace 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -96,6 +96,17 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        timestamp = timestamps[i];
+        setValue(values[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index d236eab..da2a953 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -81,6 +81,20 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        timestamp = timestamps[i];
+        setValue(values[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return hasCandidateResult;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 321bc78..fad90de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -76,6 +76,16 @@ public class MaxTimeAggrResult extends AggregateResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        updateMaxTimeResult(timestamps[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index a5c4e47..148503d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -64,6 +64,19 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        updateMaxTimeResult(timestamps[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return hasCandidateResult;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index 8f59994..34d9622 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -74,10 +74,18 @@ public class MaxValueAggrResult extends AggregateResult {
     Comparable<Object> maxVal = null;
     Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      if (values[i] == null) {
-        continue;
+      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
+        maxVal = (Comparable<Object>) values[i];
       }
-      if (maxVal == null || maxVal.compareTo(values[i]) < 0) {
+    }
+    updateResult(maxVal);
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    Comparable<Object> maxVal = null;
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
         maxVal = (Comparable<Object>) values[i];
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 3864c08..1a63ee1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -86,6 +86,19 @@ public class MinTimeAggrResult extends AggregateResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    if (hasFinalResult()) {
+      return;
+    }
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        setLongValue(timestamps[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return hasCandidateResult;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 2e65be3..6b45f14 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -53,6 +53,16 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   }
 
   @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = length - 1; i >= 0; i--) {
+      if (values[i] != null) {
+        setLongValue(timestamps[i]);
+        return;
+      }
+    }
+  }
+
+  @Override
   public boolean hasFinalResult() {
     return false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 761fbac..eefbbb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -69,10 +69,18 @@ public class MinValueAggrResult extends AggregateResult {
     Comparable<Object> minVal = null;
     Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      if (values[i] == null) {
-        continue;
+      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
+        minVal = (Comparable<Object>) values[i];
       }
-      if (minVal == null || minVal.compareTo(values[i]) > 0) {
+    }
+    updateResult(minVal);
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    Comparable<Object> minVal = null;
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
         minVal = (Comparable<Object>) values[i];
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index d441d76..8a11502 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -88,6 +88,15 @@ public class SumAggrResult extends AggregateResult {
     }
   }
 
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    for (int i = 0; i < length; i++) {
+      if (values[i] != null) {
+        updateSum(values[i]);
+      }
+    }
+  }
+
   private void updateSum(Object sumVal) throws UnSupportedDataTypeException {
     double preValue = getDoubleValue();
     switch (seriesDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 4c54ac9..0b0d010 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -58,6 +58,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 @SuppressWarnings("java:S1135") // ignore todos
@@ -349,14 +350,20 @@ public class AggregationExecutor {
       this.ascending = false;
     }
     TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
-    List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
+    // group by path name
+    Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
+        groupAggregationsBySeries(selectedSeries);
+    Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
     List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
     try {
       for (int i = 0; i < selectedSeries.size(); i++) {
         PartialPath path = selectedSeries.get(i);
-        IReaderByTimestamp seriesReaderByTimestamp =
-            getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-        readersOfSelectedSeries.add(seriesReaderByTimestamp);
+        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+        if (indexes != null) {
+          IReaderByTimestamp seriesReaderByTimestamp =
+              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+        }
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
@@ -369,7 +376,7 @@ public class AggregationExecutor {
           AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending);
       aggregateResults.add(result);
     }
-    aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
+    aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
     return constructDataSet(aggregateResults, queryPlan);
   }
 
@@ -395,7 +402,7 @@ public class AggregationExecutor {
   private void aggregateWithValueFilter(
       List<AggregateResult> aggregateResults,
       TimeGenerator timestampGenerator,
-      List<IReaderByTimestamp> readersOfSelectedSeries)
+      Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
       throws IOException {
 
     while (timestampGenerator.hasNext()) {
@@ -411,11 +418,16 @@ public class AggregationExecutor {
       }
 
       // cal part of aggregate result
-      for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
-        aggregateResults
-            .get(i)
-            .updateResultUsingTimestamps(
-                timeArray, timeArrayLength, readersOfSelectedSeries.get(i));
+      for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
+        if (entry.getValue().size() == 1) {
+          aggregateResults
+              .get(entry.getValue().get(0))
+              .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
+        } else {
+          Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+          for (Integer i : entry.getValue())
+            aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
+        }
       }
     }
   }
@@ -474,9 +486,7 @@ public class AggregationExecutor {
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       PartialPath series = selectedSeries.get(i);
-      List<Integer> indexList =
-          pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>());
-      indexList.add(i);
+      pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>()).add(i);
     }
     return pathToAggrIndexesMap;
   }