You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/03/01 23:53:54 UTC

[pinot] branch master updated: Fixes filter aggs perf (#10356)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c7e771d22 Fixes filter aggs perf (#10356)
9c7e771d22 is described below

commit 9c7e771d22a0cbd3391593e9976669fff4d4b442
Author: Evan Galpin <eg...@users.noreply.github.com>
AuthorDate: Wed Mar 1 15:53:47 2023 -0800

    Fixes filter aggs perf (#10356)
---
 .../operator/filter/CombinedFilterOperator.java    |  3 +++
 .../function/AggregationFunctionUtils.java         | 18 +++++++++------
 .../pinot/queries/FilteredAggregationsTest.java    | 27 ++++++++++++++++++++++
 ...erSegmentAggregationSingleValueQueriesTest.java | 12 +++++-----
 ...terSegmentAggregationMultiValueQueriesTest.java |  2 +-
 ...SegmentAggregationMultiValueRawQueriesTest.java |  2 +-
 6 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
index c0fa2fa04f..9b1b0e0672 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -59,6 +59,9 @@ public class CombinedFilterOperator extends BaseFilterOperator {
   @Override
   protected FilterBlock getNextBlock() {
     Tracing.activeRecording().setNumChildren(2);
+    if (_mainFilterOperator instanceof MatchAllFilterOperator) {
+      return _subFilterOperator.nextBlock();
+    }
     BlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
     BlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
     return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet), _queryOptions));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 89e9d3d8e5..99393d30f1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -223,10 +223,11 @@ public class AggregationFunctionUtils {
     // If it is, populate the corresponding filter operator and corresponding transform operator
     assert aggregationFunctions != null;
     for (Pair<AggregationFunction, FilterContext> inputPair : aggregationFunctions) {
-      if (inputPair.getLeft() != null) {
-        FilterContext currentFilterExpression = inputPair.getRight();
+      AggregationFunction aggFunc = inputPair.getLeft();
+      FilterContext currentFilterExpression = inputPair.getRight();
+      if (currentFilterExpression != null) {
         if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
-          filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getLeft());
+          filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(aggFunc);
           continue;
         }
         Pair<FilterPlanNode, BaseFilterOperator> filterPlanOpPair =
@@ -241,10 +242,10 @@ public class AggregationFunctionUtils {
         // fetching the relevant TransformOperator when resolving blocks during aggregation
         // execution
         List<AggregationFunction> aggFunctionList = new ArrayList<>();
-        aggFunctionList.add(inputPair.getLeft());
+        aggFunctionList.add(aggFunc);
         filterContextToAggFuncsMap.put(currentFilterExpression, Pair.of(aggFunctionList, newTransformOperator));
       } else {
-        nonFilteredAggregationFunctions.add(inputPair.getLeft());
+        nonFilteredAggregationFunctions.add(aggFunc);
       }
     }
     // Convert to array since FilteredGroupByOperator expects it
@@ -255,8 +256,11 @@ public class AggregationFunctionUtils {
       }
       aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new AggregationFunction[0]), pair.getRight()));
     }
-    aggToTransformOpList.add(
-        Pair.of(nonFilteredAggregationFunctions.toArray(new AggregationFunction[0]), mainTransformOperator));
+
+    if (!nonFilteredAggregationFunctions.isEmpty()) {
+      aggToTransformOpList.add(
+          Pair.of(nonFilteredAggregationFunctions.toArray(new AggregationFunction[0]), mainTransformOperator));
+    }
 
     return aggToTransformOpList;
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
index 2ea664ec67..d3636e4fc9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
@@ -94,6 +94,7 @@ public class FilteredAggregationsTest extends BaseQueriesTest {
     invertedIndexCols.add(INT_COL_NAME);
 
     indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+    indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
     ImmutableSegment firstImmutableSegment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), indexLoadingConfig);
     ImmutableSegment secondImmutableSegment =
@@ -446,4 +447,30 @@ public class FilteredAggregationsTest extends BaseQueriesTest {
             + "ORDER BY testAvg";
     testQuery(filterQuery, nonFilterQuery);
   }
+
+  @Test
+  public void testSameNumScannedFilteredAggMatchAll() {
+    // For a single filtered aggregation, the same number of docs should be scanned regardless of which portions of
+    // the filter are in the filter expression Vs. the main predicate i.e. the applied filters are commutative.
+    String filterQuery =
+        "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable";
+    String nonFilterQuery =
+        "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000";
+    long filterQueryDocsScanned = getBrokerResponse(filterQuery).getNumDocsScanned();
+    long nonFilterQueryDocsScanned = getBrokerResponse(nonFilterQuery).getNumDocsScanned();
+    assertEquals(filterQueryDocsScanned, nonFilterQueryDocsScanned);
+  }
+
+  @Test
+  public void testSameNumScannedFilteredAgg() {
+    // For a single filtered aggregation, the same number of docs should be scanned regardless of which portions of
+    // the filter are in the filter expression Vs. the main predicate i.e. the applied filters are commutative.
+    String filterQuery =
+        "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable WHERE INT_COL < 1000000";
+    String nonFilterQuery =
+        "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 AND INT_COL < 1000000";
+    long filterQueryDocsScanned = getBrokerResponse(filterQuery).getNumDocsScanned();
+    long nonFilterQueryDocsScanned = getBrokerResponse(nonFilterQuery).getNumDocsScanned();
+    assertEquals(filterQueryDocsScanned, nonFilterQueryDocsScanned);
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 8d0e187521..a59008be4c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -66,8 +66,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
         + "FROM testTable WHERE column3 > 0";
     FilteredAggregationOperator aggregationOperator = getOperator(query);
     AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 180000L, 0L,
-        540000L, 30000L);
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 150000L, 0L,
+        450000L, 30000L);
     QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 22266008882250L, 30000, 2147419555,
         32289159189150L, 28175373944314L, 30000L);
 
@@ -76,8 +76,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
         + "FROM testTable";
     aggregationOperator = getOperator(query);
     resultsBlock = aggregationOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 180000L, 0L,
-        540000L, 30000L);
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 150000L, 0L,
+        450000L, 30000L);
     QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 22266008882250L, 30000, 2147419555,
         32289159189150L, 28175373944314L, 30000L);
 
@@ -86,8 +86,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
         + "SUM(column3), AVG(column7) FILTER(WHERE column7 > 0 AND column7 < 100) FROM testTable";
     aggregationOperator = getOperator(query);
     resultsBlock = aggregationOperator.nextBlock();
-    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 150000L, 0L,
-        450000L, 30000L);
+    QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 120000L, 0L,
+        360000L, 30000L);
     QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 22266008882250L, 30000, 2147419555,
         32289159189150L, 0L, 0L);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 4d32358814..c4abd67d4e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -595,7 +595,7 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
     ResultTable expectedResultTable =
         new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{370236L}));
-    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 740472L, 400000L, 0L, 400000L, expectedResultTable);
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L, 400000L, expectedResultTable);
   }
 
   @Test
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index 06d89e6573..df2dbb4e4e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -534,7 +534,7 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
     ResultTable expectedResultTable =
         new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{370236L}));
-    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 740472L, 400000L, 0L, 400000L, expectedResultTable);
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L, 400000L, expectedResultTable);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org