You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/30 22:33:42 UTC
[pinot] 01/01: Aggregation Filter
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch aggregation_filter
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9a2b052d63cbf3802cc24a40d2d9cefd5912cac4
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Dec 30 23:32:33 2021 +0100
Aggregation Filter
---
.../pinot/core/operator/blocks/FilterBlock.java | 15 ++-
.../core/operator/docidsets/BitmapDocIdSet.java | 12 ++-
.../operator/docidsets/FilterBlockDocIdSet.java | 40 ++++++++
...pDocIdSet.java => RangelessBitmapDocIdSet.java} | 20 ++--
.../operator/filter/CombinedFilterOperator.java | 66 ++++++++++++
.../query/FilteredAggregationOperator.java | 111 +++++++++++++++++++++
6 files changed, 248 insertions(+), 16 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
index 1f87255..aa97bc4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
@@ -30,14 +30,25 @@ import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
*/
public class FilterBlock implements Block {
private final FilterBlockDocIdSet _filterBlockDocIdSet;
+ private FilterBlockDocIdSet _nonScanFilterBlockDocIdSet;
public FilterBlock(FilterBlockDocIdSet filterBlockDocIdSet) {
_filterBlockDocIdSet = filterBlockDocIdSet;
}
+ /**
+ * Pre-scans the documents if needed, and returns a non-scan-based FilterBlockDocIdSet.
+ */
+ public FilterBlockDocIdSet getNonScanFilterBLockDocIdSet() {
+ if (_nonScanFilterBlockDocIdSet == null) {
+ _nonScanFilterBlockDocIdSet = _filterBlockDocIdSet.toNonScanDocIdSet();
+ }
+ return _nonScanFilterBlockDocIdSet;
+ }
+
@Override
public FilterBlockDocIdSet getBlockDocIdSet() {
- return _filterBlockDocIdSet;
+ return _nonScanFilterBlockDocIdSet != null ? _nonScanFilterBlockDocIdSet : _filterBlockDocIdSet;
}
@Override
@@ -54,4 +65,4 @@ public class FilterBlock implements Block {
public BlockMetadata getMetadata() {
throw new UnsupportedOperationException();
}
-}
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
index eacd4e3..a69ac00 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
@@ -23,17 +23,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
public class BitmapDocIdSet implements FilterBlockDocIdSet {
- private final ImmutableRoaringBitmap _docIds;
- private final int _numDocs;
+ private final BitmapDocIdIterator _iterator;
public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
- _docIds = docIds;
- _numDocs = numDocs;
+ _iterator = new BitmapDocIdIterator(docIds, numDocs);
+ }
+
+ public BitmapDocIdSet(BitmapDocIdIterator iterator) {
+ _iterator = iterator;
}
@Override
public BitmapDocIdIterator iterator() {
- return new BitmapDocIdIterator(_docIds, _numDocs);
+ return _iterator;
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
index 92b6ac7..18dab2b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
@@ -18,7 +18,16 @@
*/
package org.apache.pinot.core.operator.docidsets;
+import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.operator.dociditerators.AndDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.OrDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.segment.spi.Constants;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
/**
@@ -32,4 +41,35 @@ public interface FilterBlockDocIdSet extends BlockDocIdSet {
* filtering phase. This method should be called after the filtering is done.
*/
long getNumEntriesScannedInFilter();
+
+ /**
+ * For scan-based FilterBlockDocIdSet, pre-scans the documents and returns a non-scan-based FilterBlockDocIdSet.
+ */
+ default FilterBlockDocIdSet toNonScanDocIdSet() {
+ BlockDocIdIterator docIdIterator = iterator();
+
+ // NOTE: AND and OR DocIdIterator might contain scan-based DocIdIterator
+ // TODO: This scan is not counted in the execution stats
+ if (docIdIterator instanceof ScanBasedDocIdIterator || docIdIterator instanceof AndDocIdIterator
+ || docIdIterator instanceof OrDocIdIterator) {
+ RoaringBitmapWriter<MutableRoaringBitmap> bitmapWriter =
+ RoaringBitmapWriter.bufferWriter().runCompress(false).get();
+ int docId;
+ while ((docId = docIdIterator.next()) != Constants.EOF) {
+ bitmapWriter.add(docId);
+ }
+ return new RangelessBitmapDocIdSet(bitmapWriter.get());
+ }
+
+ // NOTE: AND and OR DocIdSet might return BitmapBasedDocIdIterator after processing the iterators. Create a new
+ // DocIdSet to prevent processing the iterators again
+ if (docIdIterator instanceof RangelessBitmapDocIdIterator) {
+ return new RangelessBitmapDocIdSet((RangelessBitmapDocIdIterator) docIdIterator);
+ }
+ if (docIdIterator instanceof BitmapDocIdIterator) {
+ return new BitmapDocIdSet((BitmapDocIdIterator) docIdIterator);
+ }
+
+ return this;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
similarity index 66%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
index eacd4e3..463a2df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
@@ -18,22 +18,24 @@
*/
package org.apache.pinot.core.operator.docidsets;
-import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
-public class BitmapDocIdSet implements FilterBlockDocIdSet {
- private final ImmutableRoaringBitmap _docIds;
- private final int _numDocs;
+public class RangelessBitmapDocIdSet implements FilterBlockDocIdSet {
+ private final RangelessBitmapDocIdIterator _iterator;
- public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
- _docIds = docIds;
- _numDocs = numDocs;
+ public RangelessBitmapDocIdSet(ImmutableRoaringBitmap docIds) {
+ _iterator = new RangelessBitmapDocIdIterator(docIds);
+ }
+
+ public RangelessBitmapDocIdSet(RangelessBitmapDocIdIterator iterator) {
+ _iterator = iterator;
}
@Override
- public BitmapDocIdIterator iterator() {
- return new BitmapDocIdIterator(_docIds, _numDocs);
+ public RangelessBitmapDocIdIterator iterator() {
+ return _iterator;
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
new file mode 100644
index 0000000..54c26dc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+
+
+/**
+ * A combined filter operator consisting of one main filter operator and one sub filter operator. The result block is
+ * the AND result of the main and sub filter.
+ */
+public class CombinedFilterOperator extends BaseFilterOperator {
+ private static final String OPERATOR_NAME = "CombinedFilterOperator";
+ private static final String EXPLAIN_NAME = "FILTER_COMBINED";
+
+ private final BaseFilterOperator _mainFilterOperator;
+ private final BaseFilterOperator _subFilterOperator;
+
+ public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator) {
+ _mainFilterOperator = mainFilterOperator;
+ _subFilterOperator = subFilterOperator;
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return Arrays.asList(_mainFilterOperator, _subFilterOperator);
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected FilterBlock getNextBlock() {
+ FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
+ FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
+ return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet)));
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
new file mode 100644
index 0000000..727a820
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.AggregationExecutor;
+import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+
+/**
+ * The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
+ */
+@SuppressWarnings("rawtypes")
+public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> {
+ private static final String OPERATOR_NAME = "FilteredAggregationOperator";
+ private static final String EXPLAIN_NAME = "FILTERED_AGGREGATE";
+
+ private final AggregationFunction[] _aggregationFunctions;
+ private final List<Pair<AggregationFunction[], TransformOperator>> _filteredAggregations;
+ private final long _numTotalDocs;
+
+ private long _numDocsScanned;
+ private long _numEntriesScannedInFilter;
+ private long _numEntriesScannedPostFilter;
+
+ public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions,
+ List<Pair<AggregationFunction[], TransformOperator>> filteredAggregations, long numTotalDocs) {
+ _aggregationFunctions = aggregationFunctions;
+ _filteredAggregations = filteredAggregations;
+ _numTotalDocs = numTotalDocs;
+ }
+
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ int numAggregations = _aggregationFunctions.length;
+ Object[] result = new Object[numAggregations];
+ IdentityHashMap<AggregationFunction, Integer> resultIndexMap = new IdentityHashMap<>(numAggregations);
+ for (int i = 0; i < numAggregations; i++) {
+ resultIndexMap.put(_aggregationFunctions[i], i);
+ }
+ for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _filteredAggregations) {
+ AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft();
+ AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions);
+ TransformOperator transformOperator = filteredAggregation.getRight();
+ TransformBlock transformBlock;
+ int numDocsScanned = 0;
+ while ((transformBlock = transformOperator.nextBlock()) != null) {
+ aggregationExecutor.aggregate(transformBlock);
+ numDocsScanned += transformBlock.getNumDocs();
+ }
+ List<Object> filteredResult = aggregationExecutor.getResult();
+ int numFilteredAggregations = aggregationFunctions.length;
+ for (int i = 0; i < numFilteredAggregations; i++) {
+ result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i);
+ }
+ _numDocsScanned += numDocsScanned;
+ _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected();
+ }
+ return new IntermediateResultsBlock(_aggregationFunctions, Arrays.asList(result), false);
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return _filteredAggregations.stream().map(Pair::getRight).collect(Collectors.toList());
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return new ExecutionStatistics(_numDocsScanned, _numEntriesScannedInFilter, _numEntriesScannedPostFilter,
+ _numTotalDocs);
+ }
+
+ @Override
+ public String toExplainString() {
+ // TODO: To be added
+ return null;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org