You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/30 22:33:41 UTC

[pinot] branch aggregation_filter created (now 9a2b052)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch aggregation_filter
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at 9a2b052  Aggregation Filter

This branch includes the following new commits:

     new 9a2b052  Aggregation Filter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Aggregation Filter

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch aggregation_filter
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9a2b052d63cbf3802cc24a40d2d9cefd5912cac4
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Dec 30 23:32:33 2021 +0100

    Aggregation Filter
---
 .../pinot/core/operator/blocks/FilterBlock.java    |  15 ++-
 .../core/operator/docidsets/BitmapDocIdSet.java    |  12 ++-
 .../operator/docidsets/FilterBlockDocIdSet.java    |  40 ++++++++
 ...pDocIdSet.java => RangelessBitmapDocIdSet.java} |  20 ++--
 .../operator/filter/CombinedFilterOperator.java    |  66 ++++++++++++
 .../query/FilteredAggregationOperator.java         | 111 +++++++++++++++++++++
 6 files changed, 248 insertions(+), 16 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
index 1f87255..aa97bc4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java
@@ -30,14 +30,25 @@ import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
  */
 public class FilterBlock implements Block {
   private final FilterBlockDocIdSet _filterBlockDocIdSet;
+  private FilterBlockDocIdSet _nonScanFilterBlockDocIdSet;
 
   public FilterBlock(FilterBlockDocIdSet filterBlockDocIdSet) {
     _filterBlockDocIdSet = filterBlockDocIdSet;
   }
 
+  /**
+   * Pre-scans the documents if needed, and returns a non-scan-based FilterBlockDocIdSet.
+   */
+  public FilterBlockDocIdSet getNonScanFilterBLockDocIdSet() {
+    if (_nonScanFilterBlockDocIdSet == null) {
+      _nonScanFilterBlockDocIdSet = _filterBlockDocIdSet.toNonScanDocIdSet();
+    }
+    return _nonScanFilterBlockDocIdSet;
+  }
+
   @Override
   public FilterBlockDocIdSet getBlockDocIdSet() {
-    return _filterBlockDocIdSet;
+    return _nonScanFilterBlockDocIdSet != null ? _nonScanFilterBlockDocIdSet : _filterBlockDocIdSet;
   }
 
   @Override
@@ -54,4 +65,4 @@ public class FilterBlock implements Block {
   public BlockMetadata getMetadata() {
     throw new UnsupportedOperationException();
   }
-}
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
index eacd4e3..a69ac00 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
@@ -23,17 +23,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 
 public class BitmapDocIdSet implements FilterBlockDocIdSet {
-  private final ImmutableRoaringBitmap _docIds;
-  private final int _numDocs;
+  private final BitmapDocIdIterator _iterator;
 
   public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
-    _docIds = docIds;
-    _numDocs = numDocs;
+    _iterator = new BitmapDocIdIterator(docIds, numDocs);
+  }
+
+  public BitmapDocIdSet(BitmapDocIdIterator iterator) {
+    _iterator = iterator;
   }
 
   @Override
   public BitmapDocIdIterator iterator() {
-    return new BitmapDocIdIterator(_docIds, _numDocs);
+    return _iterator;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
index 92b6ac7..18dab2b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java
@@ -18,7 +18,16 @@
  */
 package org.apache.pinot.core.operator.docidsets;
 
+import org.apache.pinot.core.common.BlockDocIdIterator;
 import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.operator.dociditerators.AndDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.OrDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.segment.spi.Constants;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
@@ -32,4 +41,35 @@ public interface FilterBlockDocIdSet extends BlockDocIdSet {
    * filtering phase. This method should be called after the filtering is done.
    */
   long getNumEntriesScannedInFilter();
+
+  /**
+   * For scan-based FilterBlockDocIdSet, pre-scans the documents and returns a non-scan-based FilterBlockDocIdSet.
+   */
+  default FilterBlockDocIdSet toNonScanDocIdSet() {
+    BlockDocIdIterator docIdIterator = iterator();
+
+    // NOTE: AND and OR DocIdIterator might contain scan-based DocIdIterator
+    // TODO: This scan is not counted in the execution stats
+    if (docIdIterator instanceof ScanBasedDocIdIterator || docIdIterator instanceof AndDocIdIterator
+        || docIdIterator instanceof OrDocIdIterator) {
+      RoaringBitmapWriter<MutableRoaringBitmap> bitmapWriter =
+          RoaringBitmapWriter.bufferWriter().runCompress(false).get();
+      int docId;
+      while ((docId = docIdIterator.next()) != Constants.EOF) {
+        bitmapWriter.add(docId);
+      }
+      return new RangelessBitmapDocIdSet(bitmapWriter.get());
+    }
+
+    // NOTE: AND and OR DocIdSet might return BitmapBasedDocIdIterator after processing the iterators. Create a new
+    //       DocIdSet to prevent processing the iterators again
+    if (docIdIterator instanceof RangelessBitmapDocIdIterator) {
+      return new RangelessBitmapDocIdSet((RangelessBitmapDocIdIterator) docIdIterator);
+    }
+    if (docIdIterator instanceof BitmapDocIdIterator) {
+      return new BitmapDocIdSet((BitmapDocIdIterator) docIdIterator);
+    }
+
+    return this;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
similarity index 66%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
index eacd4e3..463a2df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java
@@ -18,22 +18,24 @@
  */
 package org.apache.pinot.core.operator.docidsets;
 
-import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 
-public class BitmapDocIdSet implements FilterBlockDocIdSet {
-  private final ImmutableRoaringBitmap _docIds;
-  private final int _numDocs;
+public class RangelessBitmapDocIdSet implements FilterBlockDocIdSet {
+  private final RangelessBitmapDocIdIterator _iterator;
 
-  public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
-    _docIds = docIds;
-    _numDocs = numDocs;
+  public RangelessBitmapDocIdSet(ImmutableRoaringBitmap docIds) {
+    _iterator = new RangelessBitmapDocIdIterator(docIds);
+  }
+
+  public RangelessBitmapDocIdSet(RangelessBitmapDocIdIterator iterator) {
+    _iterator = iterator;
   }
 
   @Override
-  public BitmapDocIdIterator iterator() {
-    return new BitmapDocIdIterator(_docIds, _numDocs);
+  public RangelessBitmapDocIdIterator iterator() {
+    return _iterator;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
new file mode 100644
index 0000000..54c26dc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+
+
+/**
+ * A combined filter operator consisting of one main filter operator and one sub filter operator. The result block is
+ * the AND result of the main and sub filter.
+ */
+public class CombinedFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "CombinedFilterOperator";
+  private static final String EXPLAIN_NAME = "FILTER_COMBINED";
+
+  private final BaseFilterOperator _mainFilterOperator;
+  private final BaseFilterOperator _subFilterOperator;
+
+  public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator) {
+    _mainFilterOperator = mainFilterOperator;
+    _subFilterOperator = subFilterOperator;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Arrays.asList(_mainFilterOperator, _subFilterOperator);
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+    FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
+    FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
+    return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet)));
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
new file mode 100644
index 0000000..727a820
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.AggregationExecutor;
+import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+
+/**
+ * The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment.
+ */
+@SuppressWarnings("rawtypes")
+public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "FilteredAggregationOperator";
+  private static final String EXPLAIN_NAME = "FILTERED_AGGREGATE";
+
+  private final AggregationFunction[] _aggregationFunctions;
+  private final List<Pair<AggregationFunction[], TransformOperator>> _filteredAggregations;
+  private final long _numTotalDocs;
+
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+
+  public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions,
+      List<Pair<AggregationFunction[], TransformOperator>> filteredAggregations, long numTotalDocs) {
+    _aggregationFunctions = aggregationFunctions;
+    _filteredAggregations = filteredAggregations;
+    _numTotalDocs = numTotalDocs;
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    int numAggregations = _aggregationFunctions.length;
+    Object[] result = new Object[numAggregations];
+    IdentityHashMap<AggregationFunction, Integer> resultIndexMap = new IdentityHashMap<>(numAggregations);
+    for (int i = 0; i < numAggregations; i++) {
+      resultIndexMap.put(_aggregationFunctions[i], i);
+    }
+    for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _filteredAggregations) {
+      AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft();
+      AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions);
+      TransformOperator transformOperator = filteredAggregation.getRight();
+      TransformBlock transformBlock;
+      int numDocsScanned = 0;
+      while ((transformBlock = transformOperator.nextBlock()) != null) {
+        aggregationExecutor.aggregate(transformBlock);
+        numDocsScanned += transformBlock.getNumDocs();
+      }
+      List<Object> filteredResult = aggregationExecutor.getResult();
+      int numFilteredAggregations = aggregationFunctions.length;
+      for (int i = 0; i < numFilteredAggregations; i++) {
+        result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i);
+      }
+      _numDocsScanned += numDocsScanned;
+      _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+      _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected();
+    }
+    return new IntermediateResultsBlock(_aggregationFunctions, Arrays.asList(result), false);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return _filteredAggregations.stream().map(Pair::getRight).collect(Collectors.toList());
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    return new ExecutionStatistics(_numDocsScanned, _numEntriesScannedInFilter, _numEntriesScannedPostFilter,
+        _numTotalDocs);
+  }
+
+  @Override
+  public String toExplainString() {
+    // TODO: To be added
+    return null;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org