You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/04/24 22:14:09 UTC
[incubator-druid] 08/20: refactor druid-bloom-filter aggregators
(#7496)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 357d1862a4528a2f84a4d146f7a751a859a84f7c
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Thu Apr 18 11:54:06 2019 -0700
refactor druid-bloom-filter aggregators (#7496)
* now with 100% more buffer
* there can be only 1
* simplify
* javadoc
* clean up unused test method
* fix exception message
* style
* why does style hate javadocs
* review stuff
* style :(
---
.../bloom/BaseBloomFilterAggregator.java | 113 ++++++++++++++++++++-
.../bloom/BaseBloomFilterBufferAggregator.java | 105 -------------------
.../bloom/BloomFilterAggregateCombiner.java | 72 -------------
.../bloom/BloomFilterAggregatorFactory.java | 82 +++++++++------
.../bloom/BloomFilterMergeAggregator.java | 32 ++----
.../bloom/BloomFilterMergeAggregatorFactory.java | 25 +++--
.../bloom/BloomFilterMergeBufferAggregator.java | 40 --------
.../bloom/DoubleBloomFilterAggregator.java | 12 ++-
.../bloom/DoubleBloomFilterBufferAggregator.java | 44 --------
.../bloom/EmptyBloomFilterAggregator.java | 37 -------
.../bloom/FloatBloomFilterAggregator.java | 12 ++-
.../bloom/FloatBloomFilterBufferAggregator.java | 44 --------
.../bloom/LongBloomFilterAggregator.java | 12 ++-
.../bloom/LongBloomFilterBufferAggregator.java | 44 --------
...regator.java => NoopBloomFilterAggregator.java} | 12 ++-
.../bloom/StringBloomFilterAggregator.java | 18 ++--
.../bloom/StringBloomFilterBufferAggregator.java | 56 ----------
.../bloom/BloomFilterAggregatorTest.java | 104 +++++++++++--------
.../bloom/BloomFilterGroupByQueryTest.java | 10 +-
.../bloom/sql/BloomFilterSqlAggregatorTest.java | 1 -
.../druid/sql/calcite/util/CalciteTests.java | 2 -
21 files changed, 287 insertions(+), 590 deletions(-)
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
index 652236b..48ba083 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java
@@ -20,20 +20,119 @@
package org.apache.druid.query.aggregation.bloom;
import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.filter.BloomKFilter;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
-public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator
+/**
+ * All bloom filter aggregations are done using {@link ByteBuffer}, so this base class implements both {@link Aggregator}
+ * and {@link BufferAggregator}.
+ *
+ * If used as an {@link Aggregator} the caller MUST specify the 'onHeap' parameter in the
+ * constructor as "true", or else the "collector" will not be allocated and null pointer exceptions will make things sad.
+ *
+ * If used as a {@link BufferAggregator}, the "collector" buffer is not necessary, and should be called with "false",
+ * but at least nothing dramatic will happen like incorrect use in the {@link Aggregator} case.
+ *
+ * {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of
+ * all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value
+ * to "true" and "false" respectively for
+ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and
+ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered}
+ *
+ * @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values
+ * to add to a bloom filter, or other bloom filters to merge into this bloom filter.
+ */
+public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector>
+ implements BufferAggregator, Aggregator
{
- final BloomKFilter collector;
+ @Nullable
+ private final ByteBuffer collector;
+ protected final int maxNumEntries;
protected final TSelector selector;
- BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector)
+ /**
+ * @param selector selector that feeds values to the aggregator
+ * @param maxNumEntries maximum number of entries that can be added to a bloom filter before accuracy degrades rapidly
+ * @param onHeap allocate a ByteBuffer "collector" to use as an {@link Aggregator}
+ */
+ BaseBloomFilterAggregator(TSelector selector, int maxNumEntries, boolean onHeap)
{
- this.collector = collector;
this.selector = selector;
+ this.maxNumEntries = maxNumEntries;
+ if (onHeap) {
+ BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries);
+ this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries));
+ BloomKFilter.serialize(collector, bloomFilter);
+ } else {
+ collector = null;
+ }
+ }
+
+ abstract void bufferAdd(ByteBuffer buf);
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ final ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ BloomKFilter filter = new BloomKFilter(maxNumEntries);
+ BloomKFilter.serialize(mutationBuffer, filter);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ final int oldPosition = buf.position();
+ try {
+ buf.position(position);
+ bufferAdd(buf);
+ }
+ finally {
+ buf.position(oldPosition);
+ }
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ ByteBuffer mutationBuffer = buf.duplicate();
+ mutationBuffer.position(position);
+ int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries);
+ mutationBuffer.limit(position + sizeBytes);
+
+ ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
+ resultCopy.put(mutationBuffer.slice());
+ resultCopy.rewind();
+ return resultCopy;
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()");
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()");
+ }
+
+ @Override
+ public double getDouble(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()");
+ }
+
+ @Override
+ public void aggregate()
+ {
+ aggregate(collector, 0);
}
@Nullable
@@ -66,4 +165,10 @@ public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableCo
{
// nothing to close
}
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("selector", selector);
+ }
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
deleted file mode 100644
index ff866f9..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.BaseNullableColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNullableColumnValueSelector> implements BufferAggregator
-{
- protected final int maxNumEntries;
- protected final TSelector selector;
-
- BaseBloomFilterBufferAggregator(TSelector selector, int maxNumEntries)
- {
- this.selector = selector;
- this.maxNumEntries = maxNumEntries;
- }
-
- abstract void bufferAdd(ByteBuffer buf);
-
- @Override
- public void init(ByteBuffer buf, int position)
- {
- final ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
- BloomKFilter filter = new BloomKFilter(maxNumEntries);
- BloomKFilter.serialize(mutationBuffer, filter);
- }
-
- @Override
- public void aggregate(ByteBuffer buf, int position)
- {
- final int oldPosition = buf.position();
- buf.position(position);
- bufferAdd(buf);
- buf.position(oldPosition);
- }
-
-
- @Override
- public Object get(ByteBuffer buf, int position)
- {
- ByteBuffer mutationBuffer = buf.duplicate();
- mutationBuffer.position(position);
- // | k (byte) | numLongs (int) | bitset (long[numLongs]) |
- int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
- mutationBuffer.limit(position + sizeBytes);
-
- ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
- resultCopy.put(mutationBuffer.slice());
- resultCopy.rewind();
- return resultCopy;
- }
-
- @Override
- public float getFloat(ByteBuffer buf, int position)
- {
- throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()");
- }
-
- @Override
- public long getLong(ByteBuffer buf, int position)
- {
- throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()");
- }
-
- @Override
- public double getDouble(ByteBuffer buf, int position)
- {
- throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()");
- }
-
- @Override
- public void close()
- {
- // nothing to close
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("selector", selector);
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java
deleted file mode 100644
index 6fc4bf9..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import javax.annotation.Nullable;
-
-public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner<BloomKFilter>
-{
- @Nullable
- private BloomKFilter combined;
-
- private final int maxNumEntries;
-
- public BloomFilterAggregateCombiner(int maxNumEntries)
- {
- this.maxNumEntries = maxNumEntries;
- }
-
- @Override
- public void reset(ColumnValueSelector selector)
- {
- combined = null;
- fold(selector);
- }
-
- @Override
- public void fold(ColumnValueSelector selector)
- {
- BloomKFilter other = (BloomKFilter) selector.getObject();
- if (other == null) {
- return;
- }
- if (combined == null) {
- combined = new BloomKFilter(maxNumEntries);
- }
- combined.merge(other);
- }
-
- @Nullable
- @Override
- public BloomKFilter getObject()
- {
- return combined;
- }
-
- @Override
- public Class<? extends BloomKFilter> classOfObject()
- {
- return BloomKFilter.class;
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
index af60135..74e921c 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
@@ -40,7 +40,6 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
@@ -59,10 +58,6 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
BloomKFilter.getNumSetBits(buf1, buf1.position()),
BloomKFilter.getNumSetBits(buf2, buf2.position())
);
- } else if (o1 instanceof BloomKFilter && o2 instanceof BloomKFilter) {
- BloomKFilter o1f = (BloomKFilter) o1;
- BloomKFilter o2f = (BloomKFilter) o2;
- return Integer.compare(o1f.getNumSetBits(), o2f.getNumSetBits());
} else {
throw new RE("Unable to compare unexpected types [%s]", o1.getClass().getName());
}
@@ -87,14 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory columnFactory)
{
- BloomKFilter filter = new BloomKFilter(maxNumEntries);
ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
if (capabilities == null) {
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
if (selector instanceof NilColumnValueSelector) {
// BloomKFilter must be the same size so we cannot use a constant for the empty agg
- return new EmptyBloomFilterAggregator(filter);
+ return new NoopBloomFilterAggregator(maxNumEntries, true);
}
throw new IAE(
"Cannot create bloom filter buffer aggregator for column selector type [%s]",
@@ -104,13 +98,29 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
ValueType type = capabilities.getType();
switch (type) {
case STRING:
- return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), filter);
+ return new StringBloomFilterAggregator(
+ columnFactory.makeDimensionSelector(field),
+ maxNumEntries,
+ true
+ );
case LONG:
- return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter);
+ return new LongBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ true
+ );
case FLOAT:
- return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter);
+ return new FloatBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ true
+ );
case DOUBLE:
- return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter);
+ return new DoubleBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ true
+ );
default:
throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type);
}
@@ -124,7 +134,7 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
if (capabilities == null) {
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
if (selector instanceof NilColumnValueSelector) {
- return new EmptyBloomFilterBufferAggregator(maxNumEntries);
+ return new NoopBloomFilterAggregator(maxNumEntries, false);
}
throw new IAE(
"Cannot create bloom filter buffer aggregator for column selector type [%s]",
@@ -135,18 +145,28 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
ValueType type = capabilities.getType();
switch (type) {
case STRING:
- return new StringBloomFilterBufferAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries);
+ return new StringBloomFilterAggregator(
+ columnFactory.makeDimensionSelector(field),
+ maxNumEntries,
+ false
+ );
case LONG:
- return new LongBloomFilterBufferAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries
+ return new LongBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ false
);
case FLOAT:
- return new FloatBloomFilterBufferAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries
+ return new FloatBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ false
);
case DOUBLE:
- return new DoubleBloomFilterBufferAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries
+ return new DoubleBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ false
);
default:
throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type);
@@ -168,14 +188,19 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
if (lhs == null) {
return rhs;
}
- ((BloomKFilter) lhs).merge((BloomKFilter) rhs);
+ BloomKFilter.mergeBloomFilterByteBuffers(
+ (ByteBuffer) lhs,
+ ((ByteBuffer) lhs).position(),
+ (ByteBuffer) rhs,
+ ((ByteBuffer) rhs).position()
+ );
return lhs;
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
- return new BloomFilterAggregateCombiner(maxNumEntries);
+ throw new UnsupportedOperationException("Bloom filter aggregators are query-time only");
}
@Override
@@ -195,6 +220,8 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
{
if (object instanceof String) {
return ByteBuffer.wrap(StringUtils.decodeBase64String((String) object));
+ } else if (object instanceof byte[]) {
+ return ByteBuffer.wrap((byte[]) object);
} else {
return object;
}
@@ -203,18 +230,7 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
@Override
public Object finalizeComputation(Object object)
{
- try {
- if (object instanceof ByteBuffer) {
- return BloomKFilter.deserialize((ByteBuffer) object);
- } else if (object instanceof byte[]) {
- return BloomKFilter.deserialize(ByteBuffer.wrap((byte[]) object));
- } else {
- return object;
- }
- }
- catch (IOException ioe) {
- throw new RuntimeException("Failed to deserialize BloomKFilter", ioe);
- }
+ return object;
}
@JsonProperty
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
index 67d7a70..011f2f6 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
@@ -19,39 +19,27 @@
package org.apache.druid.query.aggregation.bloom;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.ColumnValueSelector;
-import java.io.IOException;
import java.nio.ByteBuffer;
-public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<Object>>
+public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>>
{
- public BloomFilterMergeAggregator(ColumnValueSelector<Object> selector, BloomKFilter collector)
+ public BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
{
- super(selector, collector);
+ super(selector, maxNumEntries, onHeap);
}
@Override
- public void aggregate()
+ public void bufferAdd(ByteBuffer buf)
{
- Object other = selector.getObject();
- if (other != null) {
- if (other instanceof BloomKFilter) {
- collector.merge((BloomKFilter) other);
- } else if (other instanceof ByteBuffer) {
- // fun fact: because bloom filter agg factory deserialize returns a byte buffer to avoid unnecessary serde,
- // but GroupByQueryEngine (group by v1) ends up trying to merge ByteBuffers from buffer aggs with this agg
- // instead of the BloomFilterBufferMergeAggregator. fun! Also, it requires a 'ComplexMetricSerde' to be
- // registered even for query time only aggs, but then never uses it. also fun!
- try {
- BloomKFilter otherFilter = BloomKFilter.deserialize((ByteBuffer) other);
- collector.merge(otherFilter);
- }
- catch (IOException ioe) {
- throw new RuntimeException("Failed to deserialize BloomKFilter", ioe);
- }
- }
+ ByteBuffer other = selector.getObject();
+ if (other == null) {
+ // nulls should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
+ throw new ISE("WTF?! Unexpected null value in BloomFilterMergeAggregator");
}
+ BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
}
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java
index 8dab867..ed5ce29 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java
@@ -25,7 +25,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
@@ -48,23 +47,13 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact
@Override
public Aggregator factorize(final ColumnSelectorFactory metricFactory)
{
- final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
- // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
- if (selector instanceof NilColumnValueSelector) {
- throw new ISE("WTF?! Unexpected NilColumnValueSelector");
- }
- return new BloomFilterMergeAggregator((ColumnValueSelector<Object>) selector, new BloomKFilter(getMaxNumEntries()));
+ return makeMergeAggregator(metricFactory);
}
@Override
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory)
{
- final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
- // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
- if (selector instanceof NilColumnValueSelector) {
- throw new ISE("WTF?! Unexpected NilColumnValueSelector");
- }
- return new BloomFilterMergeBufferAggregator((ColumnValueSelector<ByteBuffer>) selector, getMaxNumEntries());
+ return makeMergeAggregator(metricFactory);
}
@Override
@@ -81,4 +70,14 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact
.appendInt(getMaxNumEntries())
.build();
}
+
+ private BloomFilterMergeAggregator makeMergeAggregator(ColumnSelectorFactory metricFactory)
+ {
+ final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
+ // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected
+ if (selector instanceof NilColumnValueSelector) {
+ throw new ISE("WTF?! Unexpected NilColumnValueSelector");
+ }
+ return new BloomFilterMergeAggregator((ColumnValueSelector<ByteBuffer>) selector, getMaxNumEntries(), true);
+ }
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java
deleted file mode 100644
index 026a23e..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class BloomFilterMergeBufferAggregator extends BaseBloomFilterBufferAggregator<ColumnValueSelector<ByteBuffer>>
-{
- public BloomFilterMergeBufferAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries)
- {
- super(selector, maxNumEntries);
- }
-
- @Override
- public void bufferAdd(ByteBuffer buf)
- {
- ByteBuffer other = selector.getObject();
- BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java
index dfdae6c..8aa899e 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java
@@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import java.nio.ByteBuffer;
+
public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator<BaseDoubleColumnValueSelector>
{
- DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, BloomKFilter collector)
+ DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries, boolean onHeap)
{
- super(selector, collector);
+ super(selector, maxNumEntries, onHeap);
}
@Override
- public void aggregate()
+ public void bufferAdd(ByteBuffer buf)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- collector.addDouble(selector.getDouble());
+ BloomKFilter.addDouble(buf, selector.getDouble());
} else {
- collector.addBytes(null, 0, 0);
+ BloomKFilter.addBytes(buf, null, 0, 0);
}
}
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java
deleted file mode 100644
index e84b9fc..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class DoubleBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseDoubleColumnValueSelector>
-{
- DoubleBloomFilterBufferAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries)
- {
- super(selector, maxNumEntries);
- }
-
- @Override
- public void bufferAdd(ByteBuffer buf)
- {
- if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- BloomKFilter.addDouble(buf, selector.getDouble());
- } else {
- BloomKFilter.addBytes(buf, null, 0, 0);
- }
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java
deleted file mode 100644
index 57df6f2..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.NilColumnValueSelector;
-
-public final class EmptyBloomFilterAggregator extends BaseBloomFilterAggregator<NilColumnValueSelector>
-{
- EmptyBloomFilterAggregator(BloomKFilter collector)
- {
- super(NilColumnValueSelector.instance(), collector);
- }
-
- @Override
- public void aggregate()
- {
- // nothing to do
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java
index ae53d16..0a7c042 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java
@@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import java.nio.ByteBuffer;
+
public final class FloatBloomFilterAggregator extends BaseBloomFilterAggregator<BaseFloatColumnValueSelector>
{
- FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, BloomKFilter collector)
+ FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries, boolean onHeap)
{
- super(selector, collector);
+ super(selector, maxNumEntries, onHeap);
}
@Override
- public void aggregate()
+ public void bufferAdd(ByteBuffer buf)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- collector.addFloat(selector.getFloat());
+ BloomKFilter.addFloat(buf, selector.getFloat());
} else {
- collector.addBytes(null, 0, 0);
+ BloomKFilter.addBytes(buf, null, 0, 0);
}
}
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java
deleted file mode 100644
index 27e88d4..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.BaseFloatColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class FloatBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseFloatColumnValueSelector>
-{
- FloatBloomFilterBufferAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries)
- {
- super(selector, maxNumEntries);
- }
-
- @Override
- public void bufferAdd(ByteBuffer buf)
- {
- if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- BloomKFilter.addFloat(buf, selector.getFloat());
- } else {
- BloomKFilter.addBytes(buf, null, 0, 0);
- }
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java
index caa4739..3e232ce 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java
@@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseLongColumnValueSelector;
+import java.nio.ByteBuffer;
+
public final class LongBloomFilterAggregator extends BaseBloomFilterAggregator<BaseLongColumnValueSelector>
{
- LongBloomFilterAggregator(BaseLongColumnValueSelector selector, BloomKFilter collector)
+ LongBloomFilterAggregator(BaseLongColumnValueSelector selector, int maxNumEntries, boolean onHeap)
{
- super(selector, collector);
+ super(selector, maxNumEntries, onHeap);
}
@Override
- public void aggregate()
+ public void bufferAdd(ByteBuffer buf)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- collector.addLong(selector.getLong());
+ BloomKFilter.addLong(buf, selector.getLong());
} else {
- collector.addBytes(null, 0, 0);
+ BloomKFilter.addBytes(buf, null, 0, 0);
}
}
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java
deleted file mode 100644
index 13a6634..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.BaseLongColumnValueSelector;
-
-import java.nio.ByteBuffer;
-
-public final class LongBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseLongColumnValueSelector>
-{
- LongBloomFilterBufferAggregator(BaseLongColumnValueSelector selector, int maxNumEntries)
- {
- super(selector, maxNumEntries);
- }
-
- @Override
- public void bufferAdd(ByteBuffer buf)
- {
- if (NullHandling.replaceWithDefault() || !selector.isNull()) {
- BloomKFilter.addLong(buf, selector.getLong());
- } else {
- BloomKFilter.addBytes(buf, null, 0, 0);
- }
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java
similarity index 78%
rename from extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java
rename to extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java
index 7b6301d..ec23df5 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java
@@ -23,11 +23,11 @@ import org.apache.druid.segment.NilColumnValueSelector;
import java.nio.ByteBuffer;
-public final class EmptyBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<NilColumnValueSelector>
+public final class NoopBloomFilterAggregator extends BaseBloomFilterAggregator<NilColumnValueSelector>
{
- EmptyBloomFilterBufferAggregator(int maxNumEntries)
+ NoopBloomFilterAggregator(int maxNumEntries, boolean onHeap)
{
- super(NilColumnValueSelector.instance(), maxNumEntries);
+ super(NilColumnValueSelector.instance(), maxNumEntries, onHeap);
}
@Override
@@ -41,4 +41,10 @@ public final class EmptyBloomFilterBufferAggregator extends BaseBloomFilterBuffe
{
// nothing to do
}
+
+ @Override
+ public void aggregate()
+ {
+ // nothing to do
+ }
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
index 351ef84..f3f6dae 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
@@ -22,32 +22,34 @@ package org.apache.druid.query.aggregation.bloom;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.DimensionSelector;
+import java.nio.ByteBuffer;
+
public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
{
- StringBloomFilterAggregator(DimensionSelector selector, BloomKFilter collector)
+
+ StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
{
- super(selector, collector);
+ super(selector, maxNumEntries, onHeap);
}
@Override
- public void aggregate()
+ public void bufferAdd(ByteBuffer buf)
{
- // note: there might be room for optimization here but behavior must match BloomDimFilter implementation
if (selector.getRow().size() > 1) {
selector.getRow().forEach(v -> {
String value = selector.lookupName(v);
if (value == null) {
- collector.addBytes(null, 0, 0);
+ BloomKFilter.addBytes(buf, null, 0, 0);
} else {
- collector.addString(value);
+ BloomKFilter.addString(buf, value);
}
});
} else {
String value = (String) selector.getObject();
if (value == null) {
- collector.addBytes(null, 0, 0);
+ BloomKFilter.addBytes(buf, null, 0, 0);
} else {
- collector.addString(value);
+ BloomKFilter.addString(buf, value);
}
}
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java
deleted file mode 100644
index c7c17c9..0000000
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.bloom;
-
-import org.apache.druid.query.filter.BloomKFilter;
-import org.apache.druid.segment.DimensionSelector;
-
-import java.nio.ByteBuffer;
-
-public final class StringBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<DimensionSelector>
-{
-
- StringBloomFilterBufferAggregator(DimensionSelector selector, int maxNumEntries)
- {
- super(selector, maxNumEntries);
- }
-
- @Override
- public void bufferAdd(ByteBuffer buf)
- {
- if (selector.getRow().size() > 1) {
- selector.getRow().forEach(v -> {
- String value = selector.lookupName(v);
- if (value == null) {
- BloomKFilter.addBytes(buf, null, 0, 0);
- } else {
- BloomKFilter.addString(buf, value);
- }
- });
- } else {
- String value = (String) selector.getObject();
- if (value == null) {
- BloomKFilter.addBytes(buf, null, 0, 0);
- } else {
- BloomKFilter.addString(buf, value);
- }
- }
- }
-}
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java
index 790cf8c..da4479b 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java
@@ -241,13 +241,15 @@ public class BloomFilterAggregatorTest
public void testAggregateValues() throws IOException
{
DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values1, null);
- StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, new BloomKFilter(maxNumValues));
+ StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true);
for (int i = 0; i < values1.size(); ++i) {
aggregateDimension(Collections.singletonList(dimSelector), agg);
}
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedFilter1, serialized);
}
@@ -256,13 +258,15 @@ public class BloomFilterAggregatorTest
public void testAggregateLongValues() throws IOException
{
TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1));
- LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, new BloomKFilter(maxNumValues));
+ LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true);
for (Long ignored : longValues1) {
aggregateColumn(Collections.singletonList(selector), agg);
}
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedLongFilter, serialized);
}
@@ -271,13 +275,15 @@ public class BloomFilterAggregatorTest
public void testAggregateFloatValues() throws IOException
{
TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1));
- FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, new BloomKFilter(maxNumValues));
+ FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true);
for (Float ignored : floatValues1) {
aggregateColumn(Collections.singletonList(selector), agg);
}
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedFloatFilter, serialized);
}
@@ -286,13 +292,15 @@ public class BloomFilterAggregatorTest
public void testAggregateDoubleValues() throws IOException
{
TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1));
- DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, new BloomKFilter(maxNumValues));
+ DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true);
for (Double ignored : doubleValues1) {
aggregateColumn(Collections.singletonList(selector), agg);
}
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get());
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get())
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedDoubleFilter, serialized);
}
@@ -301,7 +309,7 @@ public class BloomFilterAggregatorTest
public void testBufferAggregateStringValues() throws IOException
{
DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values2, null);
- StringBloomFilterBufferAggregator agg = new StringBloomFilterBufferAggregator(dimSelector, maxNumValues);
+ StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true);
int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -313,7 +321,9 @@ public class BloomFilterAggregatorTest
for (int i = 0; i < values2.size(); ++i) {
bufferAggregateDimension(Collections.singletonList(dimSelector), agg, buf, pos);
}
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedFilter2, serialized);
}
@@ -322,7 +332,7 @@ public class BloomFilterAggregatorTest
public void testBufferAggregateLongValues() throws IOException
{
TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1));
- LongBloomFilterBufferAggregator agg = new LongBloomFilterBufferAggregator(selector, maxNumValues);
+ LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true);
int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -333,7 +343,9 @@ public class BloomFilterAggregatorTest
IntStream.range(0, longValues1.length)
.forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos));
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedLongFilter, serialized);
}
@@ -342,7 +354,7 @@ public class BloomFilterAggregatorTest
public void testBufferAggregateFloatValues() throws IOException
{
TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1));
- FloatBloomFilterBufferAggregator agg = new FloatBloomFilterBufferAggregator(selector, maxNumValues);
+ FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true);
int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -353,7 +365,9 @@ public class BloomFilterAggregatorTest
IntStream.range(0, floatValues1.length)
.forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos));
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedFloatFilter, serialized);
}
@@ -362,7 +376,7 @@ public class BloomFilterAggregatorTest
public void testBufferAggregateDoubleValues() throws IOException
{
TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1));
- DoubleBloomFilterBufferAggregator agg = new DoubleBloomFilterBufferAggregator(selector, maxNumValues);
+ DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true);
int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls();
ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -373,7 +387,9 @@ public class BloomFilterAggregatorTest
IntStream.range(0, doubleValues1.length)
.forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos));
- BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos));
+ BloomKFilter bloomKFilter = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos))
+ );
String serialized = filterToString(bloomKFilter);
Assert.assertEquals(serializedDoubleFilter, serialized);
}
@@ -384,8 +400,8 @@ public class BloomFilterAggregatorTest
DimensionSelector dimSelector1 = new CardinalityAggregatorTest.TestDimensionSelector(values1, null);
DimensionSelector dimSelector2 = new CardinalityAggregatorTest.TestDimensionSelector(values2, null);
- StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, new BloomKFilter(maxNumValues));
- StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, new BloomKFilter(maxNumValues));
+ StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, maxNumValues, true);
+ StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, maxNumValues, true);
for (int i = 0; i < values1.size(); ++i) {
aggregateDimension(Collections.singletonList(dimSelector1), agg1);
@@ -394,10 +410,12 @@ public class BloomFilterAggregatorTest
aggregateDimension(Collections.singletonList(dimSelector2), agg2);
}
- BloomKFilter combined = (BloomKFilter) valueAggregatorFactory.finalizeComputation(
- valueAggregatorFactory.combine(
- agg1.get(),
- agg2.get()
+ BloomKFilter combined = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.finalizeComputation(
+ valueAggregatorFactory.combine(
+ agg1.get(),
+ agg2.get()
+ )
)
);
@@ -408,19 +426,25 @@ public class BloomFilterAggregatorTest
@Test
public void testMergeValues() throws IOException
{
- final TestBloomFilterColumnSelector mergeDim =
- new TestBloomFilterColumnSelector(ImmutableList.of(filter1, filter2));
+ final TestBloomFilterBufferColumnSelector mergeDim =
+ new TestBloomFilterBufferColumnSelector(
+ ImmutableList.of(
+ ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)),
+ ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2))
+ )
+ );
BloomFilterMergeAggregator mergeAggregator =
- new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues));
+ new BloomFilterMergeAggregator(mergeDim, maxNumValues, true);
for (int i = 0; i < 2; ++i) {
aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator);
}
- BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory()
- .finalizeComputation(mergeAggregator.get());
+ BloomKFilter merged = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get())
+ );
String serialized = filterToString(merged);
Assert.assertEquals(serializedCombinedFilter, serialized);
}
@@ -428,8 +452,8 @@ public class BloomFilterAggregatorTest
@Test
public void testMergeValuesWithBuffersForGroupByV1() throws IOException
{
- final TestBloomFilterColumnSelector mergeDim =
- new TestBloomFilterColumnSelector(
+ final TestBloomFilterBufferColumnSelector mergeDim =
+ new TestBloomFilterBufferColumnSelector(
ImmutableList.of(
ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)),
ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2))
@@ -437,15 +461,16 @@ public class BloomFilterAggregatorTest
);
BloomFilterMergeAggregator mergeAggregator =
- new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues));
+ new BloomFilterMergeAggregator(mergeDim, maxNumValues, true);
for (int i = 0; i < 2; ++i) {
aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator);
}
- BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory()
- .finalizeComputation(mergeAggregator.get());
+ BloomKFilter merged = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get())
+ );
String serialized = filterToString(merged);
Assert.assertEquals(serializedCombinedFilter, serialized);
}
@@ -461,7 +486,7 @@ public class BloomFilterAggregatorTest
)
);
- BloomFilterMergeBufferAggregator mergeAggregator = new BloomFilterMergeBufferAggregator(mergeDim, maxNumValues);
+ BloomFilterMergeAggregator mergeAggregator = new BloomFilterMergeAggregator(mergeDim, maxNumValues, false);
int maxSize = valueAggregatorFactory.getCombiningFactory().getMaxIntermediateSizeWithNulls();
ByteBuffer buf = ByteBuffer.allocate(maxSize + 64);
@@ -474,8 +499,9 @@ public class BloomFilterAggregatorTest
bufferAggregateColumn(Collections.singletonList(mergeDim), mergeAggregator, buf, pos);
}
- BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory()
- .finalizeComputation(mergeAggregator.get(buf, pos));
+ BloomKFilter merged = BloomKFilter.deserialize(
+ (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get(buf, pos))
+ );
String serialized = filterToString(merged);
Assert.assertEquals(serializedCombinedFilter, serialized);
@@ -596,14 +622,6 @@ public class BloomFilterAggregatorTest
}
}
- public static class TestBloomFilterColumnSelector extends SteppableSelector<Object>
- {
- public TestBloomFilterColumnSelector(List<Object> values)
- {
- super(values);
- }
- }
-
public static class TestBloomFilterBufferColumnSelector extends SteppableSelector<ByteBuffer>
{
public TestBloomFilterBufferColumnSelector(List<ByteBuffer> values)
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
index a2207f2..ce3b932 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
@@ -42,6 +42,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -108,9 +109,10 @@ public class BloomFilterGroupByQueryTest
MapBasedRow row = ingestAndQuery(query);
- Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("mezzanine"));
- Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("premium"));
- Assert.assertFalse(((BloomKFilter) row.getRaw("blooming_quality")).testString("entertainment"));
+ BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("blooming_quality"));
+ Assert.assertTrue(filter.testString("mezzanine"));
+ Assert.assertTrue(filter.testString("premium"));
+ Assert.assertFalse(filter.testString("entertainment"));
}
@Test
@@ -135,7 +137,7 @@ public class BloomFilterGroupByQueryTest
Object val = row.getRaw("blooming_quality");
- String serialized = BloomFilterAggregatorTest.filterToString((BloomKFilter) val);
+ String serialized = BloomFilterAggregatorTest.filterToString(BloomKFilter.deserialize((ByteBuffer) val));
String empty = BloomFilterAggregatorTest.filterToString(filter);
Assert.assertEquals(empty, serialized);
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
index 6b218bb..17241ed 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -179,7 +179,6 @@ public class BloomFilterSqlAggregatorTest
.rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS)
.buildMMappedIndex();
-
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(DATA_SOURCE)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 9ca3233..898b19f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -425,8 +425,6 @@ public class CalciteTests
)
);
-
-
public static final List<InputRow> ROWS2 = ImmutableList.of(
createRow("2000-01-01", "דרואיד", "he", 1.0),
createRow("2000-01-01", "druid", "en", 1.0),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org