You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/04/18 06:48:17 UTC

[GitHub] [incubator-druid] clintropolis commented on a change in pull request #7496: refactor druid-bloom-filter aggregators

clintropolis commented on a change in pull request #7496: refactor druid-bloom-filter aggregators
URL: https://github.com/apache/incubator-druid/pull/7496#discussion_r276535379
 
 

 ##########
 File path: extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
 ##########
 @@ -20,20 +20,116 @@
 package org.apache.druid.query.aggregation.bloom;
 
 import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.filter.BloomKFilter;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseNullableColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
 
-public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator
+/**
+ * All bloom filter aggregations are done using {@link ByteBuffer}, so this base class implements both {@link Aggregator}
+ * and {@link BufferAggregator}.
+ *
+ * If used as an {@link Aggregator} the caller MUST specify the 'onHeap' parameter in the
+ * constructor as "true", or else the "collector" will not be allocated and null pointer exceptions will make things sad.
+ *
+ * If used as a {@link BufferAggregator}, the "collector" buffer is not necessary, and should be called with "false",
+ * but at least nothing dramatic will happen like incorrect use in the {@link Aggregator} case.
+ *
+ * {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of
+ * all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value
+ * to "true" and "false" respectively for
+ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and
+ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered}
+ *
+ * @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values
+ *                  to add to a bloom filter, or other bloom filters to merge into this bloom filter.
+ */
+public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector>
+    implements BufferAggregator, Aggregator
 {
-  final BloomKFilter collector;
+
+  protected final ByteBuffer collector;
+  protected final int maxNumEntries;
   protected final TSelector selector;
 
-  BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector)
+  /**
+   * @param selector selector that feeds values to the aggregator
+   * @param maxNumEntries maximum number of entries that can be added to a bloom filter before accuracy degrades rapidly
+   * @param onHeap allocate a ByteBuffer "collector" to use as an {@link Aggregator}
+   */
+  BaseBloomFilterAggregator(TSelector selector, int maxNumEntries, boolean onHeap)
   {
-    this.collector = collector;
     this.selector = selector;
+    this.maxNumEntries = maxNumEntries;
+    if (onHeap) {
+      BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries);
+      this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries));
+      BloomKFilter.serialize(collector, bloomFilter);
+    } else {
+      collector = null;
+    }
+  }
+
+  abstract void bufferAdd(ByteBuffer buf);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    final ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    BloomKFilter filter = new BloomKFilter(maxNumEntries);
+    BloomKFilter.serialize(mutationBuffer, filter);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    final int oldPosition = buf.position();
+    buf.position(position);
+    bufferAdd(buf);
+    buf.position(oldPosition);
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    // | k (byte) | numLongs (int) | bitset (long[numLongs]) |
 
 Review comment:
   Oops, yes i switched this method to using `computeSizeBytes` and forgot to remove the comment (previously it was calculating by hand)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org