You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/24 02:25:55 UTC
[incubator-druid] 02/14: add bloom filter fallback aggregator when
types are unknown (#7719)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 2abb65266131c63670019ba38f55a12471b0dbbc
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu Jun 6 14:39:32 2019 -0700
add bloom filter fallback aggregator when types are unknown (#7719)
---
.../bloom/BloomFilterAggregatorFactory.java | 149 +++++++++------------
.../bloom/BloomFilterMergeAggregator.java | 2 +-
...gator.java => ObjectBloomFilterAggregator.java} | 42 +++---
.../bloom/StringBloomFilterAggregator.java | 6 +-
.../bloom/BloomFilterGroupByQueryTest.java | 79 ++++++++++-
5 files changed, 175 insertions(+), 103 deletions(-)
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
index 2c633f2..53839e8 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
@@ -82,95 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory columnFactory)
{
- ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
-
- if (capabilities == null) {
- BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
- if (selector instanceof NilColumnValueSelector) {
- // BloomKFilter must be the same size so we cannot use a constant for the empty agg
- return new NoopBloomFilterAggregator(maxNumEntries, true);
- }
- throw new IAE(
- "Cannot create bloom filter buffer aggregator for column selector type [%s]",
- selector.getClass().getName()
- );
- }
- ValueType type = capabilities.getType();
- switch (type) {
- case STRING:
- return new StringBloomFilterAggregator(
- columnFactory.makeDimensionSelector(field),
- maxNumEntries,
- true
- );
- case LONG:
- return new LongBloomFilterAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()),
- maxNumEntries,
- true
- );
- case FLOAT:
- return new FloatBloomFilterAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()),
- maxNumEntries,
- true
- );
- case DOUBLE:
- return new DoubleBloomFilterAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()),
- maxNumEntries,
- true
- );
- default:
- throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type);
- }
+ return factorizeInternal(columnFactory, true);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
- ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
-
- if (capabilities == null) {
- BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
- if (selector instanceof NilColumnValueSelector) {
- return new NoopBloomFilterAggregator(maxNumEntries, false);
- }
- throw new IAE(
- "Cannot create bloom filter buffer aggregator for column selector type [%s]",
- selector.getClass().getName()
- );
- }
-
- ValueType type = capabilities.getType();
- switch (type) {
- case STRING:
- return new StringBloomFilterAggregator(
- columnFactory.makeDimensionSelector(field),
- maxNumEntries,
- false
- );
- case LONG:
- return new LongBloomFilterAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()),
- maxNumEntries,
- false
- );
- case FLOAT:
- return new FloatBloomFilterAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()),
- maxNumEntries,
- false
- );
- case DOUBLE:
- return new DoubleBloomFilterAggregator(
- columnFactory.makeColumnValueSelector(field.getDimension()),
- maxNumEntries,
- false
- );
- default:
- throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type);
- }
+ return factorizeInternal(columnFactory, false);
}
@Override
@@ -310,4 +228,67 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
", maxNumEntries=" + maxNumEntries +
'}';
}
+
+ private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory columnFactory, boolean onHeap)
+ {
+ if (field == null || field.getDimension() == null) {
+ return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
+ }
+
+ ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
+
+ if (capabilities != null) {
+ ValueType type = capabilities.getType();
+ switch (type) {
+ case STRING:
+ return new StringBloomFilterAggregator(
+ columnFactory.makeDimensionSelector(field),
+ maxNumEntries,
+ onHeap
+ );
+ case LONG:
+ return new LongBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ onHeap
+ );
+ case FLOAT:
+ return new FloatBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ onHeap
+ );
+ case DOUBLE:
+ return new DoubleBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ onHeap
+ );
+ case COMPLEX:
+ // in an ideal world, we would check complex type, but until then assume it's a bloom filter
+ return new BloomFilterMergeAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ onHeap
+ );
+ default:
+ throw new IAE(
+ "Cannot create bloom filter %s for invalid column type [%s]",
+ onHeap ? "aggregator" : "buffer aggregator",
+ type
+ );
+ }
+ } else {
+ BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
+ if (selector instanceof NilColumnValueSelector) {
+ return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
+ }
+ // no column capabilities, use fallback 'object' aggregator
+ return new ObjectBloomFilterAggregator(
+ columnFactory.makeColumnValueSelector(field.getDimension()),
+ maxNumEntries,
+ onHeap
+ );
+ }
+ }
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
index 011f2f6..6855d83 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>>
{
- public BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
+ BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
{
super(selector, maxNumEntries, onHeap);
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java
similarity index 51%
copy from extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
copy to extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java
index f3f6dae..bc97fab 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java
@@ -19,37 +19,47 @@
package org.apache.druid.query.aggregation.bloom;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter;
+import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
-public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
+/**
+ * Handles "unknown" columns by examining what comes out of the selector
+ */
+class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<ColumnValueSelector>
{
-
- StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
+ ObjectBloomFilterAggregator(
+ ColumnValueSelector selector,
+ int maxNumEntries,
+ boolean onHeap
+ )
{
super(selector, maxNumEntries, onHeap);
}
@Override
- public void bufferAdd(ByteBuffer buf)
+ void bufferAdd(ByteBuffer buf)
{
- if (selector.getRow().size() > 1) {
- selector.getRow().forEach(v -> {
- String value = selector.lookupName(v);
- if (value == null) {
- BloomKFilter.addBytes(buf, null, 0, 0);
+ final Object object = selector.getObject();
+ if (object instanceof ByteBuffer) {
+ final ByteBuffer other = (ByteBuffer) object;
+ BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
+ } else {
+ if (NullHandling.replaceWithDefault() || !selector.isNull()) {
+ if (object instanceof Long) {
+ BloomKFilter.addLong(buf, selector.getLong());
+ } else if (object instanceof Double) {
+ BloomKFilter.addDouble(buf, selector.getDouble());
+ } else if (object instanceof Float) {
+ BloomKFilter.addFloat(buf, selector.getFloat());
} else {
- BloomKFilter.addString(buf, value);
+ StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector);
}
- });
- } else {
- String value = (String) selector.getObject();
- if (value == null) {
- BloomKFilter.addBytes(buf, null, 0, 0);
} else {
- BloomKFilter.addString(buf, value);
+ BloomKFilter.addBytes(buf, null, 0, 0);
}
}
}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
index f3f6dae..db65ca5 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
{
-
StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
{
super(selector, maxNumEntries, onHeap);
@@ -35,6 +34,11 @@ public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator
@Override
public void bufferAdd(ByteBuffer buf)
{
+ stringBufferAdd(buf, selector);
+ }
+
+ static void stringBufferAdd(ByteBuffer buf, DimensionSelector selector)
+ {
if (selector.getRow().size() > 1) {
selector.getRow().forEach(v -> {
String value = selector.lookupName(v);
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
index ce3b932..5661e7b 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.TestHelper;
import org.junit.After;
import org.junit.Assert;
@@ -61,6 +62,7 @@ public class BloomFilterGroupByQueryTest
}
private AggregationTestHelper helper;
+ private boolean isV2;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@@ -72,6 +74,7 @@ public class BloomFilterGroupByQueryTest
config,
tempFolder
);
+ isV2 = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2);
}
@Parameterized.Parameters(name = "{0}")
@@ -93,7 +96,6 @@ public class BloomFilterGroupByQueryTest
@Test
public void testQuery() throws Exception
{
-
String query = "{"
+ "\"queryType\": \"groupBy\","
+ "\"dataSource\": \"test_datasource\","
@@ -116,6 +118,81 @@ public class BloomFilterGroupByQueryTest
}
@Test
+ public void testNestedQuery() throws Exception
+ {
+ if (!isV2) {
+ return;
+ }
+
+ String query = "{"
+ + "\"queryType\": \"groupBy\","
+ + "\"dataSource\": {"
+ + "\"type\": \"query\","
+ + "\"query\": {"
+ + "\"queryType\":\"groupBy\","
+ + "\"dataSource\": \"test_datasource\","
+ + "\"intervals\": [ \"1970/2050\" ],"
+ + "\"granularity\":\"ALL\","
+ + "\"dimensions\":[],"
+ + "\"aggregations\": [{ \"type\":\"longSum\", \"name\":\"innerSum\", \"fieldName\":\"count\"}]"
+ + "}"
+ + "},"
+ + "\"granularity\": \"ALL\","
+ + "\"dimensions\": [],"
+ + "\"aggregations\": ["
+ + " { \"type\": \"bloom\", \"name\": \"bloom\", \"field\": \"innerSum\" }"
+ + "],"
+ + "\"intervals\": [ \"1970/2050\" ]"
+ + "}";
+
+ MapBasedRow row = ingestAndQuery(query);
+
+
+ BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("bloom"));
+ Assert.assertTrue(filter.testLong(13L));
+ Assert.assertFalse(filter.testLong(5L));
+ }
+
+
+ @Test
+ public void testNestedQueryComplex() throws Exception
+ {
+ if (!isV2) {
+ return;
+ }
+
+ String query = "{"
+ + "\"queryType\": \"groupBy\","
+ + "\"dataSource\": {"
+ + "\"type\": \"query\","
+ + "\"query\": {"
+ + "\"queryType\":\"groupBy\","
+ + "\"dataSource\": \"test_datasource\","
+ + "\"intervals\": [ \"1970/2050\" ],"
+ + "\"granularity\":\"ALL\","
+ + "\"dimensions\":[],"
+ + "\"filter\":{ \"type\":\"selector\", \"dimension\":\"market\", \"value\":\"upfront\"},"
+ + "\"aggregations\": [{ \"type\":\"bloom\", \"name\":\"innerBloom\", \"field\":\"quality\"}]"
+ + "}"
+ + "},"
+ + "\"granularity\": \"ALL\","
+ + "\"dimensions\": [],"
+ + "\"aggregations\": ["
+ + " { \"type\": \"bloom\", \"name\": \"innerBloom\", \"field\": \"innerBloom\" }"
+ + "],"
+ + "\"intervals\": [ \"1970/2050\" ]"
+ + "}";
+
+ MapBasedRow row = ingestAndQuery(query);
+
+
+ BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("innerBloom"));
+ Assert.assertTrue(filter.testString("mezzanine"));
+ Assert.assertTrue(filter.testString("premium"));
+ Assert.assertFalse(filter.testString("entertainment"));
+ }
+
+ @Test
public void testQueryFakeDimension() throws Exception
{
String query = "{"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org