You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/04/18 18:54:13 UTC

[incubator-druid] branch master updated: refactor druid-bloom-filter aggregators (#7496)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new be65cca  refactor druid-bloom-filter aggregators (#7496)
be65cca is described below

commit be65cca248d7b3dd5ebacc2a537aa3d25d420475
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Thu Apr 18 11:54:06 2019 -0700

    refactor druid-bloom-filter aggregators (#7496)
    
    * now with 100% more buffer
    
    * there can be only 1
    
    * simplify
    
    * javadoc
    
    * clean up unused test method
    
    * fix exception message
    
    * style
    
    * why does style hate javadocs
    
    * review stuff
    
    * style :(
---
 .../bloom/BaseBloomFilterAggregator.java           | 113 ++++++++++++++++++++-
 .../bloom/BaseBloomFilterBufferAggregator.java     | 105 -------------------
 .../bloom/BloomFilterAggregateCombiner.java        |  72 -------------
 .../bloom/BloomFilterAggregatorFactory.java        |  82 +++++++++------
 .../bloom/BloomFilterMergeAggregator.java          |  32 ++----
 .../bloom/BloomFilterMergeAggregatorFactory.java   |  25 +++--
 .../bloom/BloomFilterMergeBufferAggregator.java    |  40 --------
 .../bloom/DoubleBloomFilterAggregator.java         |  12 ++-
 .../bloom/DoubleBloomFilterBufferAggregator.java   |  44 --------
 .../bloom/FloatBloomFilterAggregator.java          |  12 ++-
 .../bloom/FloatBloomFilterBufferAggregator.java    |  44 --------
 .../bloom/LongBloomFilterAggregator.java           |  12 ++-
 .../bloom/LongBloomFilterBufferAggregator.java     |  44 --------
 .../bloom/NoopBloomFilterAggregator.java           |  19 +++-
 .../bloom/NoopBloomFilterBufferAggregator.java     |  44 --------
 .../bloom/StringBloomFilterAggregator.java         |  18 ++--
 .../bloom/StringBloomFilterBufferAggregator.java   |  56 ----------
 .../bloom/BloomFilterAggregatorTest.java           | 104 +++++++++++--------
 .../bloom/BloomFilterGroupByQueryTest.java         |  10 +-
 .../bloom/sql/BloomFilterSqlAggregatorTest.java    |   1 -
 .../druid/sql/calcite/util/CalciteTests.java       |   2 -
 21 files changed, 294 insertions(+), 597 deletions(-)

diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
index 652236b..48ba083 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
@@ -20,20 +20,119 @@
 package org.apache.druid.query.aggregation.bloom;
 
 import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.filter.BloomKFilter;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseNullableColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
 
-public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator
+/**
+ * All bloom filter aggregations are done using {@link ByteBuffer}, so this base class implements both {@link Aggregator}
+ * and {@link BufferAggregator}.
+ *
+ * If used as an {@link Aggregator} the caller MUST specify the 'onHeap' parameter in the
+ * constructor as "true", or else the "collector" will not be allocated and null pointer exceptions will make things sad.
+ *
+ * If used as a {@link BufferAggregator}, the "collector" buffer is not necessary, and should be called with "false",
+ * but at least nothing dramatic will happen like incorrect use in the {@link Aggregator} case.
+ *
+ * {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of
+ * all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value
+ * to "true" and "false" respectively for
+ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and
+ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered}
+ *
+ * @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values
+ *                  to add to a bloom filter, or other bloom filters to merge into this bloom filter.
+ */
+public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector>
+    implements BufferAggregator, Aggregator
 {
-  final BloomKFilter collector;
+  @Nullable
+  private final ByteBuffer collector;
+  protected final int maxNumEntries;
   protected final TSelector selector;
 
-  BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector)
+  /**
+   * @param selector selector that feeds values to the aggregator
+   * @param maxNumEntries maximum number of entries that can be added to a bloom filter before accuracy degrades rapidly
+   * @param onHeap allocate a ByteBuffer "collector" to use as an {@link Aggregator}
+   */
+  BaseBloomFilterAggregator(TSelector selector, int maxNumEntries, boolean onHeap)
   {
-    this.collector = collector;
     this.selector = selector;
+    this.maxNumEntries = maxNumEntries;
+    if (onHeap) {
+      BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries);
+      this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries));
+      BloomKFilter.serialize(collector, bloomFilter);
+    } else {
+      collector = null;
+    }
+  }
+
+  abstract void bufferAdd(ByteBuffer buf);
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    final ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    BloomKFilter filter = new BloomKFilter(maxNumEntries);
+    BloomKFilter.serialize(mutationBuffer, filter);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    final int oldPosition = buf.position();
+    try {
+      buf.position(position);
+      bufferAdd(buf);
+    }
+    finally {
+      buf.position(oldPosition);
+    }
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+    int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries);
+    mutationBuffer.limit(position + sizeBytes);
+
+    ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
+    resultCopy.put(mutationBuffer.slice());
+    resultCopy.rewind();
+    return resultCopy;
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()");
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()");
+  }
+
+  @Override
+  public void aggregate()
+  {
+    aggregate(collector, 0);
   }
 
   @Nullable
@@ -66,4 +165,10 @@ public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableCo
   {
     // nothing to close
   }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("selector", selector);
+  }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
deleted file mode 100644
index ff866f9..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.BaseNullableColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNullableColumnValueSelector> implements BufferAggregator
-{
-  protected final int maxNumEntries;
-  protected final TSelector selector;
-
-  BaseBloomFilterBufferAggregator(TSelector selector, int maxNumEntries)
-  {
-    this.selector = selector;
-    this.maxNumEntries = maxNumEntries;
-  }
-
-  abstract void bufferAdd(ByteBuffer buf);
-
-  @Override
-  public void init(ByteBuffer buf, int position)
-  {
-    final ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-    BloomKFilter filter = new BloomKFilter(maxNumEntries);
-    BloomKFilter.serialize(mutationBuffer, filter);
-  }
-
-  @Override
-  public void aggregate(ByteBuffer buf, int position)
-  {
-    final int oldPosition = buf.position();
-    buf.position(position);
-    bufferAdd(buf);
-    buf.position(oldPosition);
-  }
-
-
-  @Override
-  public Object get(ByteBuffer buf, int position)
-  {
-    ByteBuffer mutationBuffer = buf.duplicate();
-    mutationBuffer.position(position);
-    // | k (byte) | numLongs (int) | bitset (long[numLongs]) |
-    int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
-    mutationBuffer.limit(position + sizeBytes);
-
-    ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
-    resultCopy.put(mutationBuffer.slice());
-    resultCopy.rewind();
-    return resultCopy;
-  }
-
-  @Override
-  public float getFloat(ByteBuffer buf, int position)
-  {
-    throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()");
-  }
-
-  @Override
-  public long getLong(ByteBuffer buf, int position)
-  {
-    throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()");
-  }
-
-  @Override
-  public double getDouble(ByteBuffer buf, int position)
-  {
-    throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()");
-  }
-
-  @Override
-  public void close()
-  {
-    // nothing to close
-  }
-
-  @Override
-  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-  {
-    inspector.visit("selector", selector);
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java
deleted file mode 100644
index 6fc4bf9..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import javax.annotation.Nullable;
-
-public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner<BloomKFilter>
-{
-  @Nullable
-  private BloomKFilter combined;
-
-  private final int maxNumEntries;
-
-  public BloomFilterAggregateCombiner(int maxNumEntries)
-  {
-    this.maxNumEntries = maxNumEntries;
-  }
-
-  @Override
-  public void reset(ColumnValueSelector selector)
-  {
-    combined = null;
-    fold(selector);
-  }
-
-  @Override
-  public void fold(ColumnValueSelector selector)
-  {
-    BloomKFilter other = (BloomKFilter) selector.getObject();
-    if (other == null) {
-      return;
-    }
-    if (combined == null) {
-      combined = new BloomKFilter(maxNumEntries);
-    }
-    combined.merge(other);
-  }
-
-  @Nullable
-  @Override
-  public BloomKFilter getObject()
-  {
-    return combined;
-  }
-
-  @Override
-  public Class<? extends BloomKFilter> classOfObject()
-  {
-    return BloomKFilter.class;
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
index 42d379e..74e921c 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
@@ -40,7 +40,6 @@ import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
@@ -59,10 +58,6 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
           BloomKFilter.getNumSetBits(buf1, buf1.position()),
           BloomKFilter.getNumSetBits(buf2, buf2.position())
       );
-    } else if (o1 instanceof BloomKFilter && o2 instanceof BloomKFilter) {
-      BloomKFilter o1f = (BloomKFilter) o1;
-      BloomKFilter o2f = (BloomKFilter) o2;
-      return Integer.compare(o1f.getNumSetBits(), o2f.getNumSetBits());
     } else {
       throw new RE("Unable to compare unexpected types [%s]", o1.getClass().getName());
     }
@@ -87,14 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
   @Override
   public Aggregator factorize(ColumnSelectorFactory columnFactory)
   {
-    BloomKFilter filter = new BloomKFilter(maxNumEntries);
     ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
 
     if (capabilities == null) {
       BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
       if (selector instanceof NilColumnValueSelector) {
         // BloomKFilter must be the same size so we cannot use a constant for the empty agg
-        return new NoopBloomFilterAggregator(filter);
+        return new NoopBloomFilterAggregator(maxNumEntries, true);
       }
       throw new IAE(
           "Cannot create bloom filter buffer aggregator for column selector type [%s]",
@@ -104,13 +98,29 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
     ValueType type = capabilities.getType();
     switch (type) {
       case STRING:
-        return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), filter);
+        return new StringBloomFilterAggregator(
+            columnFactory.makeDimensionSelector(field),
+            maxNumEntries,
+            true
+        );
       case LONG:
-        return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter);
+        return new LongBloomFilterAggregator(
+            columnFactory.makeColumnValueSelector(field.getDimension()),
+            maxNumEntries,
+            true
+        );
       case FLOAT:
-        return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter);
+        return new FloatBloomFilterAggregator(
+            columnFactory.makeColumnValueSelector(field.getDimension()),
+            maxNumEntries,
+            true
+        );
       case DOUBLE:
-        return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter);
+        return new DoubleBloomFilterAggregator(
+            columnFactory.makeColumnValueSelector(field.getDimension()),
+            maxNumEntries,
+            true
+        );
       default:
         throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type);
     }
@@ -124,7 +134,7 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
     if (capabilities == null) {
       BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
       if (selector instanceof NilColumnValueSelector) {
-        return new NoopBloomFilterBufferAggregator(maxNumEntries);
+        return new NoopBloomFilterAggregator(maxNumEntries, false);
       }
       throw new IAE(
           "Cannot create bloom filter buffer aggregator for column selector type [%s]",
@@ -135,18 +145,28 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
     ValueType type = capabilities.getType();
     switch (type) {
       case STRING:
-        return new StringBloomFilterBufferAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries);
+        return new StringBloomFilterAggregator(
+            columnFactory.makeDimensionSelector(field),
+            maxNumEntries,
+            false
+        );
       case LONG:
-        return new LongBloomFilterBufferAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries
+        return new LongBloomFilterAggregator(
+            columnFactory.makeColumnValueSelector(field.getDimension()),
+            maxNumEntries,
+            false
         );
       case FLOAT:
-        return new FloatBloomFilterBufferAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries
+        return new FloatBloomFilterAggregator(
+            columnFactory.makeColumnValueSelector(field.getDimension()),
+            maxNumEntries,
+            false
         );
       case DOUBLE:
-        return new DoubleBloomFilterBufferAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries
+        return new DoubleBloomFilterAggregator(
+            columnFactory.makeColumnValueSelector(field.getDimension()),
+            maxNumEntries,
+            false
         );
       default:
         throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type);
@@ -168,14 +188,19 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
     if (lhs == null) {
       return rhs;
     }
-    ((BloomKFilter) lhs).merge((BloomKFilter) rhs);
+    BloomKFilter.mergeBloomFilterByteBuffers(
+        (ByteBuffer) lhs,
+        ((ByteBuffer) lhs).position(),
+        (ByteBuffer) rhs,
+        ((ByteBuffer) rhs).position()
+    );
     return lhs;
   }
 
   @Override
   public AggregateCombiner makeAggregateCombiner()
   {
-    return new BloomFilterAggregateCombiner(maxNumEntries);
+    throw new UnsupportedOperationException("Bloom filter aggregators are query-time only");
   }
 
   @Override
@@ -195,6 +220,8 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
   {
     if (object instanceof String) {
       return ByteBuffer.wrap(StringUtils.decodeBase64String((String) object));
+    } else if (object instanceof byte[]) {
+      return ByteBuffer.wrap((byte[]) object);
     } else {
       return object;
     }
@@ -203,18 +230,7 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
   @Override
   public Object finalizeComputation(Object object)
   {
-    try {
-      if (object instanceof ByteBuffer) {
-        return BloomKFilter.deserialize((ByteBuffer) object);
-      } else if (object instanceof byte[]) {
-        return BloomKFilter.deserialize(ByteBuffer.wrap((byte[]) object));
-      } else {
-        return object;
-      }
-    }
-    catch (IOException ioe) {
-      throw new RuntimeException("Failed to deserialize BloomKFilter", ioe);
-    }
+    return object;
   }
 
   @JsonProperty
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
index 67d7a70..011f2f6 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
@@ -19,39 +19,27 @@
 
 package org.apache.druid.query.aggregation.bloom;
 
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.ColumnValueSelector;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<Object>>
+public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>>
 {
-  public BloomFilterMergeAggregator(ColumnValueSelector<Object> selector, BloomKFilter collector)
+  public BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
   {
-    super(selector, collector);
+    super(selector, maxNumEntries, onHeap);
   }
 
   @Override
-  public void aggregate()
+  public void bufferAdd(ByteBuffer buf)
   {
-    Object other = selector.getObject();
-    if (other != null) {
-      if (other instanceof BloomKFilter) {
-        collector.merge((BloomKFilter) other);
-      } else if (other instanceof ByteBuffer) {
-        // fun fact: because bloom filter agg factory deserialize returns a byte buffer to avoid unnecessary serde,
-        // but GroupByQueryEngine (group by v1) ends up trying to merge ByteBuffers from buffer aggs with this agg
-        // instead of the BloomFilterBufferMergeAggregator. fun! Also, it requires a 'ComplexMetricSerde' to be
-        // registered even for query time only aggs, but then never uses it. also fun!
-        try {
-          BloomKFilter otherFilter = BloomKFilter.deserialize((ByteBuffer) other);
-          collector.merge(otherFilter);
-        }
-        catch (IOException ioe) {
-          throw new RuntimeException("Failed to deserialize BloomKFilter", ioe);
-        }
-      }
+    ByteBuffer other = selector.getObject();
+    if (other == null) {
+      // nulls should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
+      throw new ISE("WTF?! Unexpected null value in BloomFilterMergeAggregator");
     }
+    BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
   }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java
index 8dab867..ed5ce29 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java
@@ -25,7 +25,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.BaseNullableColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
@@ -48,23 +47,13 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact
   @Override
   public Aggregator factorize(final ColumnSelectorFactory metricFactory)
   {
-    final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
-    // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
-    if (selector instanceof NilColumnValueSelector) {
-      throw new ISE("WTF?! Unexpected NilColumnValueSelector");
-    }
-    return new BloomFilterMergeAggregator((ColumnValueSelector<Object>) selector, new BloomKFilter(getMaxNumEntries()));
+    return makeMergeAggregator(metricFactory);
   }
 
   @Override
   public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory)
   {
-    final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
-    // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
-    if (selector instanceof NilColumnValueSelector) {
-      throw new ISE("WTF?! Unexpected NilColumnValueSelector");
-    }
-    return new BloomFilterMergeBufferAggregator((ColumnValueSelector<ByteBuffer>) selector, getMaxNumEntries());
+    return makeMergeAggregator(metricFactory);
   }
 
   @Override
@@ -81,4 +70,14 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact
         .appendInt(getMaxNumEntries())
         .build();
   }
+
+  private BloomFilterMergeAggregator makeMergeAggregator(ColumnSelectorFactory metricFactory)
+  {
+    final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
+    // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
+    if (selector instanceof NilColumnValueSelector) {
+      throw new ISE("WTF?! Unexpected NilColumnValueSelector");
+    }
+    return new BloomFilterMergeAggregator((ColumnValueSelector<ByteBuffer>) selector, getMaxNumEntries(), true);
+  }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java
deleted file mode 100644
index 026a23e..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class BloomFilterMergeBufferAggregator extends BaseBloomFilterBufferAggregator<ColumnValueSelector<ByteBuffer>>
-{
-  public BloomFilterMergeBufferAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries)
-  {
-    super(selector, maxNumEntries);
-  }
-
-  @Override
-  public void bufferAdd(ByteBuffer buf)
-  {
-    ByteBuffer other = selector.getObject();
-    BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java
index dfdae6c..8aa899e 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java
@@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 
+import java.nio.ByteBuffer;
+
 public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator<BaseDoubleColumnValueSelector>
 {
-  DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, BloomKFilter collector)
+  DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries, boolean onHeap)
   {
-    super(selector, collector);
+    super(selector, maxNumEntries, onHeap);
   }
 
   @Override
-  public void aggregate()
+  public void bufferAdd(ByteBuffer buf)
   {
     if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      collector.addDouble(selector.getDouble());
+      BloomKFilter.addDouble(buf, selector.getDouble());
     } else {
-      collector.addBytes(null, 0, 0);
+      BloomKFilter.addBytes(buf, null, 0, 0);
     }
   }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java
deleted file mode 100644
index e84b9fc..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class DoubleBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseDoubleColumnValueSelector>
-{
-  DoubleBloomFilterBufferAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries)
-  {
-    super(selector, maxNumEntries);
-  }
-
-  @Override
-  public void bufferAdd(ByteBuffer buf)
-  {
-    if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      BloomKFilter.addDouble(buf, selector.getDouble());
-    } else {
-      BloomKFilter.addBytes(buf, null, 0, 0);
-    }
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java
index ae53d16..0a7c042 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java
@@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 
+import java.nio.ByteBuffer;
+
 public final class FloatBloomFilterAggregator extends BaseBloomFilterAggregator<BaseFloatColumnValueSelector>
 {
-  FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, BloomKFilter collector)
+  FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries, boolean onHeap)
   {
-    super(selector, collector);
+    super(selector, maxNumEntries, onHeap);
   }
 
   @Override
-  public void aggregate()
+  public void bufferAdd(ByteBuffer buf)
   {
     if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      collector.addFloat(selector.getFloat());
+      BloomKFilter.addFloat(buf, selector.getFloat());
     } else {
-      collector.addBytes(null, 0, 0);
+      BloomKFilter.addBytes(buf, null, 0, 0);
     }
   }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java
deleted file mode 100644
index 27e88d4..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.BaseFloatColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class FloatBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseFloatColumnValueSelector>
-{
-  FloatBloomFilterBufferAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries)
-  {
-    super(selector, maxNumEntries);
-  }
-
-  @Override
-  public void bufferAdd(ByteBuffer buf)
-  {
-    if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      BloomKFilter.addFloat(buf, selector.getFloat());
-    } else {
-      BloomKFilter.addBytes(buf, null, 0, 0);
-    }
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java
index caa4739..3e232ce 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java
@@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 
+import java.nio.ByteBuffer;
+
 public final class LongBloomFilterAggregator extends BaseBloomFilterAggregator<BaseLongColumnValueSelector>
 {
-  LongBloomFilterAggregator(BaseLongColumnValueSelector selector, BloomKFilter collector)
+  LongBloomFilterAggregator(BaseLongColumnValueSelector selector, int maxNumEntries, boolean onHeap)
   {
-    super(selector, collector);
+    super(selector, maxNumEntries, onHeap);
   }
 
   @Override
-  public void aggregate()
+  public void bufferAdd(ByteBuffer buf)
   {
     if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      collector.addLong(selector.getLong());
+      BloomKFilter.addLong(buf, selector.getLong());
     } else {
-      collector.addBytes(null, 0, 0);
+      BloomKFilter.addBytes(buf, null, 0, 0);
     }
   }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java
deleted file mode 100644
index 13a6634..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.BaseLongColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class LongBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseLongColumnValueSelector>
-{
-  LongBloomFilterBufferAggregator(BaseLongColumnValueSelector selector, int maxNumEntries)
-  {
-    super(selector, maxNumEntries);
-  }
-
-  @Override
-  public void bufferAdd(ByteBuffer buf)
-  {
-    if (NullHandling.replaceWithDefault() || !selector.isNull()) {
-      BloomKFilter.addLong(buf, selector.getLong());
-    } else {
-      BloomKFilter.addBytes(buf, null, 0, 0);
-    }
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java
index bc4d429..ec23df5 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java
@@ -19,14 +19,27 @@
 
 package org.apache.druid.query.aggregation.bloom;
 
-import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.NilColumnValueSelector;
 
+import java.nio.ByteBuffer;
+
 public final class NoopBloomFilterAggregator extends BaseBloomFilterAggregator<NilColumnValueSelector>
 {
-  NoopBloomFilterAggregator(BloomKFilter collector)
+  NoopBloomFilterAggregator(int maxNumEntries, boolean onHeap)
+  {
+    super(NilColumnValueSelector.instance(), maxNumEntries, onHeap);
+  }
+
+  @Override
+  public void bufferAdd(ByteBuffer buf)
   {
-    super(NilColumnValueSelector.instance(), collector);
+    // nothing to do
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    // nothing to do
   }
 
   @Override
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java
deleted file mode 100644
index 6a71d4c..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.segment.NilColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class NoopBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<NilColumnValueSelector>
-{
-  NoopBloomFilterBufferAggregator(int maxNumEntries)
-  {
-    super(NilColumnValueSelector.instance(), maxNumEntries);
-  }
-
-  @Override
-  public void bufferAdd(ByteBuffer buf)
-  {
-    // nothing to do
-  }
-
-  @Override
-  public void aggregate(ByteBuffer buf, int position)
-  {
-    // nothing to do
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
index 351ef84..f3f6dae 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
@@ -22,32 +22,34 @@ package org.apache.druid.query.aggregation.bloom;
 import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.segment.DimensionSelector;
 
+import java.nio.ByteBuffer;
+
 public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
 {
-  StringBloomFilterAggregator(DimensionSelector selector, BloomKFilter collector)
+
+  StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
   {
-    super(selector, collector);
+    super(selector, maxNumEntries, onHeap);
   }
 
   @Override
-  public void aggregate()
+  public void bufferAdd(ByteBuffer buf)
   {
-    // note: there might be room for optimization here but behavior must match BloomDimFilter implementation
     if (selector.getRow().size() > 1) {
       selector.getRow().forEach(v -> {
         String value = selector.lookupName(v);
         if (value == null) {
-          collector.addBytes(null, 0, 0);
+          BloomKFilter.addBytes(buf, null, 0, 0);
         } else {
-          collector.addString(value);
+          BloomKFilter.addString(buf, value);
         }
       });
     } else {
       String value = (String) selector.getObject();
       if (value == null) {
-        collector.addBytes(null, 0, 0);
+        BloomKFilter.addBytes(buf, null, 0, 0);
       } else {
-        collector.addString(value);
+        BloomKFilter.addString(buf, value);
       }
     }
   }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java
deleted file mode 100644
index c7c17c9..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.DimensionSelector;
-
-import java.nio.ByteBuffer;
-
-public final class StringBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<DimensionSelector>
-{
-
-  StringBloomFilterBufferAggregator(DimensionSelector selector, int maxNumEntries)
-  {
-    super(selector, maxNumEntries);
-  }
-
-  @Override
-  public void bufferAdd(ByteBuffer buf)
-  {
-    if (selector.getRow().size() > 1) {
-      selector.getRow().forEach(v -> {
-        String value = selector.lookupName(v);
-        if (value == null) {
-          BloomKFilter.addBytes(buf, null, 0, 0);
-        } else {
-          BloomKFilter.addString(buf, value);
-        }
-      });
-    } else {
-      String value = (String) selector.getObject();
-      if (value == null) {
-        BloomKFilter.addBytes(buf, null, 0, 0);
-      } else {
-        BloomKFilter.addString(buf, value);
-      }
-    }
-  }
-}
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java
index 790cf8c..da4479b 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java
@@ -241,13 +241,15 @@ public class BloomFilterAggregatorTest
   public void testAggregateValues() throws IOException
   {
     DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values1, null);
-    StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, new BloomKFilter(maxNumValues));
+    StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true);
 
     for (int i = 0; i < values1.size(); ++i) {
       aggregateDimension(Collections.singletonList(dimSelector), agg);
     }
 
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedFilter1, serialized);
   }
@@ -256,13 +258,15 @@ public class BloomFilterAggregatorTest
   public void testAggregateLongValues() throws IOException
   {
     TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1));
-    LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, new BloomKFilter(maxNumValues));
+    LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true);
 
     for (Long ignored : longValues1) {
       aggregateColumn(Collections.singletonList(selector), agg);
     }
 
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedLongFilter, serialized);
   }
@@ -271,13 +275,15 @@ public class BloomFilterAggregatorTest
   public void testAggregateFloatValues() throws IOException
   {
     TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1));
-    FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, new BloomKFilter(maxNumValues));
+    FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true);
 
     for (Float ignored : floatValues1) {
       aggregateColumn(Collections.singletonList(selector), agg);
     }
 
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedFloatFilter, serialized);
   }
@@ -286,13 +292,15 @@ public class BloomFilterAggregatorTest
   public void testAggregateDoubleValues() throws IOException
   {
     TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1));
-    DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, new BloomKFilter(maxNumValues));
+    DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true);
 
     for (Double ignored : doubleValues1) {
       aggregateColumn(Collections.singletonList(selector), agg);
     }
 
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedDoubleFilter, serialized);
   }
@@ -301,7 +309,7 @@ public class BloomFilterAggregatorTest
   public void testBufferAggregateStringValues() throws IOException
   {
     DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values2, null);
-    StringBloomFilterBufferAggregator agg = new StringBloomFilterBufferAggregator(dimSelector, maxNumValues);
+    StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true);
 
     int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
     ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -313,7 +321,9 @@ public class BloomFilterAggregatorTest
     for (int i = 0; i < values2.size(); ++i) {
       bufferAggregateDimension(Collections.singletonList(dimSelector), agg, buf, pos);
     }
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedFilter2, serialized);
   }
@@ -322,7 +332,7 @@ public class BloomFilterAggregatorTest
   public void testBufferAggregateLongValues() throws IOException
   {
     TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1));
-    LongBloomFilterBufferAggregator agg = new LongBloomFilterBufferAggregator(selector, maxNumValues);
+    LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true);
 
     int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
     ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -333,7 +343,9 @@ public class BloomFilterAggregatorTest
 
     IntStream.range(0, longValues1.length)
              .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos));
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedLongFilter, serialized);
   }
@@ -342,7 +354,7 @@ public class BloomFilterAggregatorTest
   public void testBufferAggregateFloatValues() throws IOException
   {
     TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1));
-    FloatBloomFilterBufferAggregator agg = new FloatBloomFilterBufferAggregator(selector, maxNumValues);
+    FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true);
 
     int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
     ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -353,7 +365,9 @@ public class BloomFilterAggregatorTest
 
     IntStream.range(0, floatValues1.length)
              .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos));
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedFloatFilter, serialized);
   }
@@ -362,7 +376,7 @@ public class BloomFilterAggregatorTest
   public void testBufferAggregateDoubleValues() throws IOException
   {
     TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1));
-    DoubleBloomFilterBufferAggregator agg = new DoubleBloomFilterBufferAggregator(selector, maxNumValues);
+    DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true);
 
     int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
     ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -373,7 +387,9 @@ public class BloomFilterAggregatorTest
 
     IntStream.range(0, doubleValues1.length)
              .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos));
-    BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+    BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+    );
     String serialized = filterToString(bloomKFilter);
     Assert.assertEquals(serializedDoubleFilter, serialized);
   }
@@ -384,8 +400,8 @@ public class BloomFilterAggregatorTest
     DimensionSelector dimSelector1 = new CardinalityAggregatorTest.TestDimensionSelector(values1, null);
     DimensionSelector dimSelector2 = new CardinalityAggregatorTest.TestDimensionSelector(values2, null);
 
-    StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, new BloomKFilter(maxNumValues));
-    StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, new BloomKFilter(maxNumValues));
+    StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, maxNumValues, true);
+    StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, maxNumValues, true);
 
     for (int i = 0; i < values1.size(); ++i) {
       aggregateDimension(Collections.singletonList(dimSelector1), agg1);
@@ -394,10 +410,12 @@ public class BloomFilterAggregatorTest
       aggregateDimension(Collections.singletonList(dimSelector2), agg2);
     }
 
-    BloomKFilter combined = (BloomKFilter) valueAggregatorFactory.finalizeComputation(
-        valueAggregatorFactory.combine(
-            agg1.get(),
-            agg2.get()
+    BloomKFilter combined = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.finalizeComputation(
+          valueAggregatorFactory.combine(
+              agg1.get(),
+              agg2.get()
+          )
         )
     );
 
@@ -408,19 +426,25 @@ public class BloomFilterAggregatorTest
   @Test
   public void testMergeValues() throws IOException
   {
-    final TestBloomFilterColumnSelector mergeDim =
-        new TestBloomFilterColumnSelector(ImmutableList.of(filter1, filter2));
+    final TestBloomFilterBufferColumnSelector mergeDim =
+        new TestBloomFilterBufferColumnSelector(
+            ImmutableList.of(
+                ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)),
+                ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2))
+            )
+        );
 
     BloomFilterMergeAggregator mergeAggregator =
-        new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues));
+        new BloomFilterMergeAggregator(mergeDim, maxNumValues, true);
 
     for (int i = 0; i < 2; ++i) {
       aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator);
     }
 
 
-    BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory()
-                                                               .finalizeComputation(mergeAggregator.get());
+    BloomKFilter merged = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get())
+    );
     String serialized = filterToString(merged);
     Assert.assertEquals(serializedCombinedFilter, serialized);
   }
@@ -428,8 +452,8 @@ public class BloomFilterAggregatorTest
   @Test
   public void testMergeValuesWithBuffersForGroupByV1() throws IOException
   {
-    final TestBloomFilterColumnSelector mergeDim =
-        new TestBloomFilterColumnSelector(
+    final TestBloomFilterBufferColumnSelector mergeDim =
+        new TestBloomFilterBufferColumnSelector(
             ImmutableList.of(
                 ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)),
                 ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2))
@@ -437,15 +461,16 @@ public class BloomFilterAggregatorTest
         );
 
     BloomFilterMergeAggregator mergeAggregator =
-        new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues));
+        new BloomFilterMergeAggregator(mergeDim, maxNumValues, true);
 
     for (int i = 0; i < 2; ++i) {
       aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator);
     }
 
 
-    BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory()
-                                                               .finalizeComputation(mergeAggregator.get());
+    BloomKFilter merged = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get())
+    );
     String serialized = filterToString(merged);
     Assert.assertEquals(serializedCombinedFilter, serialized);
   }
@@ -461,7 +486,7 @@ public class BloomFilterAggregatorTest
             )
         );
 
-    BloomFilterMergeBufferAggregator mergeAggregator = new BloomFilterMergeBufferAggregator(mergeDim, maxNumValues);
+    BloomFilterMergeAggregator mergeAggregator = new BloomFilterMergeAggregator(mergeDim, maxNumValues, false);
 
     int maxSize = valueAggregatorFactory.getCombiningFactory().getMaxIntermediateSizeWithNulls();
     ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -474,8 +499,9 @@ public class BloomFilterAggregatorTest
       bufferAggregateColumn(Collections.singletonList(mergeDim), mergeAggregator, buf, pos);
     }
 
-    BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory()
-                                                               .finalizeComputation(mergeAggregator.get(buf, pos));
+    BloomKFilter merged = BloomKFilter.deserialize(
+        (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get(buf, pos))
+    );
     String serialized = filterToString(merged);
 
     Assert.assertEquals(serializedCombinedFilter, serialized);
@@ -596,14 +622,6 @@ public class BloomFilterAggregatorTest
     }
   }
 
-  public static class TestBloomFilterColumnSelector extends SteppableSelector<Object>
-  {
-    public TestBloomFilterColumnSelector(List<Object> values)
-    {
-      super(values);
-    }
-  }
-
   public static class TestBloomFilterBufferColumnSelector extends SteppableSelector<ByteBuffer>
   {
     public TestBloomFilterBufferColumnSelector(List<ByteBuffer> values)
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
index a2207f2..ce3b932 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
@@ -42,6 +42,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -108,9 +109,10 @@ public class BloomFilterGroupByQueryTest
     MapBasedRow row = ingestAndQuery(query);
 
 
-    Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("mezzanine"));
-    Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("premium"));
-    Assert.assertFalse(((BloomKFilter) row.getRaw("blooming_quality")).testString("entertainment"));
+    BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("blooming_quality"));
+    Assert.assertTrue(filter.testString("mezzanine"));
+    Assert.assertTrue(filter.testString("premium"));
+    Assert.assertFalse(filter.testString("entertainment"));
   }
 
   @Test
@@ -135,7 +137,7 @@ public class BloomFilterGroupByQueryTest
 
     Object val = row.getRaw("blooming_quality");
 
-    String serialized = BloomFilterAggregatorTest.filterToString((BloomKFilter) val);
+    String serialized = BloomFilterAggregatorTest.filterToString(BloomKFilter.deserialize((ByteBuffer) val));
     String empty = BloomFilterAggregatorTest.filterToString(filter);
 
     Assert.assertEquals(empty, serialized);
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
index ad4f90a..f8f6489 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -179,7 +179,6 @@ public class BloomFilterSqlAggregatorTest
                     .rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS)
                     .buildMMappedIndex();
 
-
     walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
         DataSegment.builder()
                    .dataSource(DATA_SOURCE)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 2c09568..8e8934a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -424,8 +424,6 @@ public class CalciteTests
       )
   );
 
-
-
   public static final List<InputRow> ROWS2 = ImmutableList.of(
       createRow("2000-01-01", "דרואיד", "he", 1.0),
       createRow("2000-01-01", "druid", "en", 1.0),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org