You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/30 22:33:42 UTC

[pinot] 01/01: Aggregation Filter

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch aggregation_filter
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9a2b052d63cbf3802cc24a40d2d9cefd5912cac4
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Dec 30 23:32:33 2021 +0100

    Aggregation Filter
---
 .../pinot/core/operator/blocks/FilterBlock.java    |  15 ++-
 .../core/operator/docidsets/BitmapDocIdSet.java    |  12 ++-
 .../operator/docidsets/FilterBlockDocIdSet.java    |  40 ++++++++
 ...pDocIdSet.java => RangelessBitmapDocIdSet.java} |  20 ++--
 .../operator/filter/CombinedFilterOperator.java    |  66 ++++++++++++
 .../query/FilteredAggregationOperator.java         | 111 +++++++++++++++++++++
 6 files changed, 248 insertions(+), 16 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
index 1f87255..aa97bc4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
@@ -30,14 +30,25 @@ import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
  */
 public class FilterBlock implements Block {
   private final FilterBlockDocIdSet _filterBlockDocIdSet;
+  private FilterBlockDocIdSet _nonScanFilterBlockDocIdSet;
 
   public FilterBlock(FilterBlockDocIdSet filterBlockDocIdSet) {
     _filterBlockDocIdSet = filterBlockDocIdSet;
   }
 
+  /**
+   * Pre-scans the documents if needed, and returns a non-scan-based FilterBlockDocIdSet.
+   */
+  public FilterBlockDocIdSet getNonScanFilterBLockDocIdSet() {
+    if (_nonScanFilterBlockDocIdSet == null) {
+      _nonScanFilterBlockDocIdSet = _filterBlockDocIdSet.toNonScanDocIdSet();
+    }
+    return _nonScanFilterBlockDocIdSet;
+  }
+
   @Override
   public FilterBlockDocIdSet getBlockDocIdSet() {
-    return _filterBlockDocIdSet;
+    return _nonScanFilterBlockDocIdSet != null ? _nonScanFilterBlockDocIdSet : _filterBlockDocIdSet;
   }
 
   @Override
@@ -54,4 +65,4 @@ public class FilterBlock implements Block {
   public BlockMetadata getMetadata() {
     throw new UnsupportedOperationException();
   }
-}
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
index eacd4e3..a69ac00 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
@@ -23,17 +23,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 
 public class BitmapDocIdSet implements FilterBlockDocIdSet {
-  private final ImmutableRoaringBitmap _docIds;
-  private final int _numDocs;
+  private final BitmapDocIdIterator _iterator;
 
   public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
-    _docIds = docIds;
-    _numDocs = numDocs;
+    _iterator = new BitmapDocIdIterator(docIds, numDocs);
+  }
+
+  public BitmapDocIdSet(BitmapDocIdIterator iterator) {
+    _iterator = iterator;
   }
 
   @Override
   public BitmapDocIdIterator iterator() {
-    return new BitmapDocIdIterator(_docIds, _numDocs);
+    return _iterator;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
index 92b6ac7..18dab2b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
@@ -18,7 +18,16 @@
  */
 package org.apache.pinot.core.operator.docidsets;
 
+import org.apache.pinot.core.common.BlockDocIdIterator;
 import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.operator.dociditerators.AndDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.OrDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.segment.spi.Constants;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
@@ -32,4 +41,35 @@ public interface FilterBlockDocIdSet extends BlockDocIdSet {
    * filtering phase. This method should be called after the filtering is done.
    */
   long getNumEntriesScannedInFilter();
+
+  /**
+   * For scan-based FilterBlockDocIdSet, pre-scans the documents and returns a non-scan-based FilterBlockDocIdSet.
+   */
+  default FilterBlockDocIdSet toNonScanDocIdSet() {
+    BlockDocIdIterator docIdIterator = iterator();
+
+    // NOTE: AND and OR DocIdIterator might contain scan-based DocIdIterator
+    // TODO: This scan is not counted in the execution stats
+    if (docIdIterator instanceof ScanBasedDocIdIterator || docIdIterator instanceof AndDocIdIterator
+        || docIdIterator instanceof OrDocIdIterator) {
+      RoaringBitmapWriter<MutableRoaringBitmap> bitmapWriter =
+          RoaringBitmapWriter.bufferWriter().runCompress(false).get();
+      int docId;
+      while ((docId = docIdIterator.next()) != Constants.EOF) {
+        bitmapWriter.add(docId);
+      }
+      return new RangelessBitmapDocIdSet(bitmapWriter.get());
+    }
+
+    // NOTE: AND and OR DocIdSet might return BitmapBasedDocIdIterator after processing the iterators. Create a new
+    //       DocIdSet to prevent processing the iterators again
+    if (docIdIterator instanceof RangelessBitmapDocIdIterator) {
+      return new RangelessBitmapDocIdSet((RangelessBitmapDocIdIterator) docIdIterator);
+    }
+    if (docIdIterator instanceof BitmapDocIdIterator) {
+      return new BitmapDocIdSet((BitmapDocIdIterator) docIdIterator);
+    }
+
+    return this;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
similarity index 66%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
index eacd4e3..463a2df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
@@ -18,22 +18,24 @@
  */
 package org.apache.pinot.core.operator.docidsets;
 
-import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 
-public class BitmapDocIdSet implements FilterBlockDocIdSet {
-  private final ImmutableRoaringBitmap _docIds;
-  private final int _numDocs;
+public class RangelessBitmapDocIdSet implements FilterBlockDocIdSet {
+  private final RangelessBitmapDocIdIterator _iterator;
 
-  public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
-    _docIds = docIds;
-    _numDocs = numDocs;
+  public RangelessBitmapDocIdSet(ImmutableRoaringBitmap docIds) {
+    _iterator = new RangelessBitmapDocIdIterator(docIds);
+  }
+
+  public RangelessBitmapDocIdSet(RangelessBitmapDocIdIterator iterator) {
+    _iterator = iterator;
   }
 
   @Override
-  public BitmapDocIdIterator iterator() {
-    return new BitmapDocIdIterator(_docIds, _numDocs);
+  public RangelessBitmapDocIdIterator iterator() {
+    return _iterator;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
new file mode 100644
index 0000000..54c26dc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+
+
+/**
+ * A combined filter operator consisting of one main filter operator and one sub filter operator. The result block is
+ * the AND result of the main and sub filter.
+ */
+public class CombinedFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "CombinedFilterOperator";
+  private static final String EXPLAIN_NAME = "FILTER_COMBINED";
+
+  private final BaseFilterOperator _mainFilterOperator;
+  private final BaseFilterOperator _subFilterOperator;
+
+  public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator) {
+    _mainFilterOperator = mainFilterOperator;
+    _subFilterOperator = subFilterOperator;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Arrays.asList(_mainFilterOperator, _subFilterOperator);
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+    FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
+    FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
+    return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet)));
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
new file mode 100644
index 0000000..727a820
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.AggregationExecutor;
+import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+
+/**
+ * The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
+ */
+@SuppressWarnings("rawtypes")
+public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "FilteredAggregationOperator";
+  private static final String EXPLAIN_NAME = "FILTERED_AGGREGATE";
+
+  private final AggregationFunction[] _aggregationFunctions;
+  private final List<Pair<AggregationFunction[], TransformOperator>> _filteredAggregations;
+  private final long _numTotalDocs;
+
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+
+  public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions,
+      List<Pair<AggregationFunction[], TransformOperator>> filteredAggregations, long numTotalDocs) {
+    _aggregationFunctions = aggregationFunctions;
+    _filteredAggregations = filteredAggregations;
+    _numTotalDocs = numTotalDocs;
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    int numAggregations = _aggregationFunctions.length;
+    Object[] result = new Object[numAggregations];
+    IdentityHashMap<AggregationFunction, Integer> resultIndexMap = new IdentityHashMap<>(numAggregations);
+    for (int i = 0; i < numAggregations; i++) {
+      resultIndexMap.put(_aggregationFunctions[i], i);
+    }
+    for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _filteredAggregations) {
+      AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft();
+      AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions);
+      TransformOperator transformOperator = filteredAggregation.getRight();
+      TransformBlock transformBlock;
+      int numDocsScanned = 0;
+      while ((transformBlock = transformOperator.nextBlock()) != null) {
+        aggregationExecutor.aggregate(transformBlock);
+        numDocsScanned += transformBlock.getNumDocs();
+      }
+      List<Object> filteredResult = aggregationExecutor.getResult();
+      int numFilteredAggregations = aggregationFunctions.length;
+      for (int i = 0; i < numFilteredAggregations; i++) {
+        result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i);
+      }
+      _numDocsScanned += numDocsScanned;
+      _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+      _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected();
+    }
+    return new IntermediateResultsBlock(_aggregationFunctions, Arrays.asList(result), false);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return _filteredAggregations.stream().map(Pair::getRight).collect(Collectors.toList());
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    return new ExecutionStatistics(_numDocsScanned, _numEntriesScannedInFilter, _numEntriesScannedPostFilter,
+        _numTotalDocs);
+  }
+
+  @Override
+  public String toExplainString() {
+    // TODO: To be added
+    return null;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org