You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/03/01 23:53:54 UTC
[pinot] branch master updated: Fixes filter aggs perf (#10356)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9c7e771d22 Fixes filter aggs perf (#10356)
9c7e771d22 is described below
commit 9c7e771d22a0cbd3391593e9976669fff4d4b442
Author: Evan Galpin <eg...@users.noreply.github.com>
AuthorDate: Wed Mar 1 15:53:47 2023 -0800
Fixes filter aggs perf (#10356)
---
.../operator/filter/CombinedFilterOperator.java | 3 +++
.../function/AggregationFunctionUtils.java | 18 +++++++++------
.../pinot/queries/FilteredAggregationsTest.java | 27 ++++++++++++++++++++++
...erSegmentAggregationSingleValueQueriesTest.java | 12 +++++-----
...terSegmentAggregationMultiValueQueriesTest.java | 2 +-
...SegmentAggregationMultiValueRawQueriesTest.java | 2 +-
6 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
index c0fa2fa04f..9b1b0e0672 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -59,6 +59,9 @@ public class CombinedFilterOperator extends BaseFilterOperator {
@Override
protected FilterBlock getNextBlock() {
Tracing.activeRecording().setNumChildren(2);
+ if (_mainFilterOperator instanceof MatchAllFilterOperator) {
+ return _subFilterOperator.nextBlock();
+ }
BlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
BlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet), _queryOptions));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 89e9d3d8e5..99393d30f1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -223,10 +223,11 @@ public class AggregationFunctionUtils {
// If it is, populate the corresponding filter operator and corresponding transform operator
assert aggregationFunctions != null;
for (Pair<AggregationFunction, FilterContext> inputPair : aggregationFunctions) {
- if (inputPair.getLeft() != null) {
- FilterContext currentFilterExpression = inputPair.getRight();
+ AggregationFunction aggFunc = inputPair.getLeft();
+ FilterContext currentFilterExpression = inputPair.getRight();
+ if (currentFilterExpression != null) {
if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
- filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getLeft());
+ filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(aggFunc);
continue;
}
Pair<FilterPlanNode, BaseFilterOperator> filterPlanOpPair =
@@ -241,10 +242,10 @@ public class AggregationFunctionUtils {
// fetching the relevant TransformOperator when resolving blocks during aggregation
// execution
List<AggregationFunction> aggFunctionList = new ArrayList<>();
- aggFunctionList.add(inputPair.getLeft());
+ aggFunctionList.add(aggFunc);
filterContextToAggFuncsMap.put(currentFilterExpression, Pair.of(aggFunctionList, newTransformOperator));
} else {
- nonFilteredAggregationFunctions.add(inputPair.getLeft());
+ nonFilteredAggregationFunctions.add(aggFunc);
}
}
// Convert to array since FilteredGroupByOperator expects it
@@ -255,8 +256,11 @@ public class AggregationFunctionUtils {
}
aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new AggregationFunction[0]), pair.getRight()));
}
- aggToTransformOpList.add(
- Pair.of(nonFilteredAggregationFunctions.toArray(new AggregationFunction[0]), mainTransformOperator));
+
+ if (!nonFilteredAggregationFunctions.isEmpty()) {
+ aggToTransformOpList.add(
+ Pair.of(nonFilteredAggregationFunctions.toArray(new AggregationFunction[0]), mainTransformOperator));
+ }
return aggToTransformOpList;
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
index 2ea664ec67..d3636e4fc9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
@@ -94,6 +94,7 @@ public class FilteredAggregationsTest extends BaseQueriesTest {
invertedIndexCols.add(INT_COL_NAME);
indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+ indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
ImmutableSegment firstImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), indexLoadingConfig);
ImmutableSegment secondImmutableSegment =
@@ -446,4 +447,30 @@ public class FilteredAggregationsTest extends BaseQueriesTest {
+ "ORDER BY testAvg";
testQuery(filterQuery, nonFilterQuery);
}
+
+ @Test
+ public void testSameNumScannedFilteredAggMatchAll() {
+ // For a single filtered aggregation, the same number of docs should be scanned regardless of which portions of
+ // the filter are in the filter expression Vs. the main predicate i.e. the applied filters are commutative.
+ String filterQuery =
+ "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable";
+ String nonFilterQuery =
+ "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000";
+ long filterQueryDocsScanned = getBrokerResponse(filterQuery).getNumDocsScanned();
+ long nonFilterQueryDocsScanned = getBrokerResponse(nonFilterQuery).getNumDocsScanned();
+ assertEquals(filterQueryDocsScanned, nonFilterQueryDocsScanned);
+ }
+
+ @Test
+ public void testSameNumScannedFilteredAgg() {
+ // For a single filtered aggregation, the same number of docs should be scanned regardless of which portions of
+ // the filter are in the filter expression Vs. the main predicate i.e. the applied filters are commutative.
+ String filterQuery =
+ "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable WHERE INT_COL < 1000000";
+ String nonFilterQuery =
+ "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 AND INT_COL < 1000000";
+ long filterQueryDocsScanned = getBrokerResponse(filterQuery).getNumDocsScanned();
+ long nonFilterQueryDocsScanned = getBrokerResponse(nonFilterQuery).getNumDocsScanned();
+ assertEquals(filterQueryDocsScanned, nonFilterQueryDocsScanned);
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 8d0e187521..a59008be4c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -66,8 +66,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
+ "FROM testTable WHERE column3 > 0";
FilteredAggregationOperator aggregationOperator = getOperator(query);
AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 180000L, 0L,
- 540000L, 30000L);
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 150000L, 0L,
+ 450000L, 30000L);
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 22266008882250L, 30000, 2147419555,
32289159189150L, 28175373944314L, 30000L);
@@ -76,8 +76,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
+ "FROM testTable";
aggregationOperator = getOperator(query);
resultsBlock = aggregationOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 180000L, 0L,
- 540000L, 30000L);
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 150000L, 0L,
+ 450000L, 30000L);
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 22266008882250L, 30000, 2147419555,
32289159189150L, 28175373944314L, 30000L);
@@ -86,8 +86,8 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
+ "SUM(column3), AVG(column7) FILTER(WHERE column7 > 0 AND column7 < 100) FROM testTable";
aggregationOperator = getOperator(query);
resultsBlock = aggregationOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 150000L, 0L,
- 450000L, 30000L);
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 120000L, 0L,
+ 360000L, 30000L);
QueriesTestUtils.testInnerSegmentAggregationResult(resultsBlock.getResults(), 22266008882250L, 30000, 2147419555,
32289159189150L, 0L, 0L);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 4d32358814..c4abd67d4e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -595,7 +595,7 @@ public class InterSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
ResultTable expectedResultTable =
new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{370236L}));
- QueriesTestUtils.testInterSegmentsResult(brokerResponse, 740472L, 400000L, 0L, 400000L, expectedResultTable);
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L, 400000L, expectedResultTable);
}
@Test
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index 06d89e6573..df2dbb4e4e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -534,7 +534,7 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
ResultTable expectedResultTable =
new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{370236L}));
- QueriesTestUtils.testInterSegmentsResult(brokerResponse, 740472L, 400000L, 0L, 400000L, expectedResultTable);
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 370236L, 0L, 0L, 400000L, expectedResultTable);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org