You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/06/06 21:39:38 UTC

[incubator-druid] branch master updated: add bloom filter fallback aggregator when types are unknown (#7719)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ee0d4ea  add bloom filter fallback aggregator when types are unknown (#7719)
ee0d4ea is described below

commit ee0d4ea589234fb2d0467197939faafdd37804b4
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu Jun 6 14:39:32 2019 -0700

    add bloom filter fallback aggregator when types are unknown (#7719)
---
 .../bloom/BloomFilterAggregatorFactory.java        | 149 +++++++++------------
 .../bloom/BloomFilterMergeAggregator.java          |   2 +-
 ...gator.java => ObjectBloomFilterAggregator.java} |  42 +++---
 .../bloom/StringBloomFilterAggregator.java         |   6 +-
 .../bloom/BloomFilterGroupByQueryTest.java         |  79 ++++++++++-
 5 files changed, 175 insertions(+), 103 deletions(-)

diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
index 2c633f2..53839e8 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java
@@ -82,95 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
   @Override
   public Aggregator factorize(ColumnSelectorFactory columnFactory)
   {
-    ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
-
-    if (capabilities == null) {
-      BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
-      if (selector instanceof NilColumnValueSelector) {
-        // BloomKFilter must be the same size so we cannot use a constant for the empty agg
-        return new NoopBloomFilterAggregator(maxNumEntries, true);
-      }
-      throw new IAE(
-          "Cannot create bloom filter buffer aggregator for column selector type [%s]",
-          selector.getClass().getName()
-      );
-    }
-    ValueType type = capabilities.getType();
-    switch (type) {
-      case STRING:
-        return new StringBloomFilterAggregator(
-            columnFactory.makeDimensionSelector(field),
-            maxNumEntries,
-            true
-        );
-      case LONG:
-        return new LongBloomFilterAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()),
-            maxNumEntries,
-            true
-        );
-      case FLOAT:
-        return new FloatBloomFilterAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()),
-            maxNumEntries,
-            true
-        );
-      case DOUBLE:
-        return new DoubleBloomFilterAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()),
-            maxNumEntries,
-            true
-        );
-      default:
-        throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type);
-    }
+    return factorizeInternal(columnFactory, true);
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
   {
-    ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
-
-    if (capabilities == null) {
-      BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
-      if (selector instanceof NilColumnValueSelector) {
-        return new NoopBloomFilterAggregator(maxNumEntries, false);
-      }
-      throw new IAE(
-          "Cannot create bloom filter buffer aggregator for column selector type [%s]",
-          selector.getClass().getName()
-      );
-    }
-
-    ValueType type = capabilities.getType();
-    switch (type) {
-      case STRING:
-        return new StringBloomFilterAggregator(
-            columnFactory.makeDimensionSelector(field),
-            maxNumEntries,
-            false
-        );
-      case LONG:
-        return new LongBloomFilterAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()),
-            maxNumEntries,
-            false
-        );
-      case FLOAT:
-        return new FloatBloomFilterAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()),
-            maxNumEntries,
-            false
-        );
-      case DOUBLE:
-        return new DoubleBloomFilterAggregator(
-            columnFactory.makeColumnValueSelector(field.getDimension()),
-            maxNumEntries,
-            false
-        );
-      default:
-        throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type);
-    }
+    return factorizeInternal(columnFactory, false);
   }
 
   @Override
@@ -310,4 +228,67 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
            ", maxNumEntries=" + maxNumEntries +
            '}';
   }
+
+  private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory columnFactory, boolean onHeap)
+  {
+    if (field == null || field.getDimension() == null) {
+      return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
+    }
+
+    ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension());
+
+    if (capabilities != null) {
+      ValueType type = capabilities.getType();
+      switch (type) {
+        case STRING:
+          return new StringBloomFilterAggregator(
+              columnFactory.makeDimensionSelector(field),
+              maxNumEntries,
+              onHeap
+          );
+        case LONG:
+          return new LongBloomFilterAggregator(
+              columnFactory.makeColumnValueSelector(field.getDimension()),
+              maxNumEntries,
+              onHeap
+          );
+        case FLOAT:
+          return new FloatBloomFilterAggregator(
+              columnFactory.makeColumnValueSelector(field.getDimension()),
+              maxNumEntries,
+              onHeap
+          );
+        case DOUBLE:
+          return new DoubleBloomFilterAggregator(
+              columnFactory.makeColumnValueSelector(field.getDimension()),
+              maxNumEntries,
+              onHeap
+          );
+        case COMPLEX:
+          // in an ideal world, we would check complex type, but until then assume it's a bloom filter
+          return new BloomFilterMergeAggregator(
+              columnFactory.makeColumnValueSelector(field.getDimension()),
+              maxNumEntries,
+              onHeap
+          );
+        default:
+          throw new IAE(
+              "Cannot create bloom filter %s for invalid column type [%s]",
+              onHeap ? "aggregator" : "buffer aggregator",
+              type
+          );
+      }
+    } else {
+      BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());
+      if (selector instanceof NilColumnValueSelector) {
+        return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
+      }
+      // no column capabilities, use fallback 'object' aggregator
+      return new ObjectBloomFilterAggregator(
+          columnFactory.makeColumnValueSelector(field.getDimension()),
+          maxNumEntries,
+          onHeap
+      );
+    }
+  }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
index 011f2f6..6855d83 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
 
 public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>>
 {
-  public BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
+  BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
   {
     super(selector, maxNumEntries, onHeap);
   }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java
similarity index 51%
copy from extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
copy to extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java
index f3f6dae..bc97fab 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java
@@ -19,37 +19,47 @@
 
 package org.apache.druid.query.aggregation.bloom;
 
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.query.filter.BloomKFilter;
+import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 
 import java.nio.ByteBuffer;
 
-public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
+/**
+ * Handles "unknown" columns by examining what comes out of the selector
+ */
+class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<ColumnValueSelector>
 {
-
-  StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
+  ObjectBloomFilterAggregator(
+      ColumnValueSelector selector,
+      int maxNumEntries,
+      boolean onHeap
+  )
   {
     super(selector, maxNumEntries, onHeap);
   }
 
   @Override
-  public void bufferAdd(ByteBuffer buf)
+  void bufferAdd(ByteBuffer buf)
   {
-    if (selector.getRow().size() > 1) {
-      selector.getRow().forEach(v -> {
-        String value = selector.lookupName(v);
-        if (value == null) {
-          BloomKFilter.addBytes(buf, null, 0, 0);
+    final Object object = selector.getObject();
+    if (object instanceof ByteBuffer) {
+      final ByteBuffer other = (ByteBuffer) object;
+      BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
+    } else {
+      if (NullHandling.replaceWithDefault() || !selector.isNull()) {
+        if (object instanceof Long) {
+          BloomKFilter.addLong(buf, selector.getLong());
+        } else if (object instanceof Double) {
+          BloomKFilter.addDouble(buf, selector.getDouble());
+        } else if (object instanceof Float) {
+          BloomKFilter.addFloat(buf, selector.getFloat());
         } else {
-          BloomKFilter.addString(buf, value);
+          StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector);
         }
-      });
-    } else {
-      String value = (String) selector.getObject();
-      if (value == null) {
-        BloomKFilter.addBytes(buf, null, 0, 0);
       } else {
-        BloomKFilter.addString(buf, value);
+        BloomKFilter.addBytes(buf, null, 0, 0);
       }
     }
   }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
index f3f6dae..db65ca5 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
 
 public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector>
 {
-
   StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap)
   {
     super(selector, maxNumEntries, onHeap);
@@ -35,6 +34,11 @@ public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator
   @Override
   public void bufferAdd(ByteBuffer buf)
   {
+    stringBufferAdd(buf, selector);
+  }
+
+  static void stringBufferAdd(ByteBuffer buf, DimensionSelector selector)
+  {
     if (selector.getRow().size() > 1) {
       selector.getRow().forEach(v -> {
         String value = selector.lookupName(v);
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
index ce3b932..5661e7b 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregationTestHelper;
 import org.apache.druid.query.filter.BloomKFilter;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.segment.TestHelper;
 import org.junit.After;
 import org.junit.Assert;
@@ -61,6 +62,7 @@ public class BloomFilterGroupByQueryTest
   }
 
   private AggregationTestHelper helper;
+  private boolean isV2;
 
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
@@ -72,6 +74,7 @@ public class BloomFilterGroupByQueryTest
         config,
         tempFolder
     );
+    isV2 = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2);
   }
 
   @Parameterized.Parameters(name = "{0}")
@@ -93,7 +96,6 @@ public class BloomFilterGroupByQueryTest
   @Test
   public void testQuery() throws Exception
   {
-
     String query = "{"
                    + "\"queryType\": \"groupBy\","
                    + "\"dataSource\": \"test_datasource\","
@@ -116,6 +118,81 @@ public class BloomFilterGroupByQueryTest
   }
 
   @Test
+  public void testNestedQuery() throws Exception
+  {
+    if (!isV2) {
+      return;
+    }
+
+    String query = "{"
+                   + "\"queryType\": \"groupBy\","
+                   + "\"dataSource\": {"
+                   + "\"type\": \"query\","
+                   + "\"query\": {"
+                   + "\"queryType\":\"groupBy\","
+                   + "\"dataSource\": \"test_datasource\","
+                   + "\"intervals\": [ \"1970/2050\" ],"
+                   + "\"granularity\":\"ALL\","
+                   + "\"dimensions\":[],"
+                   + "\"aggregations\": [{ \"type\":\"longSum\", \"name\":\"innerSum\", \"fieldName\":\"count\"}]"
+                   + "}"
+                   + "},"
+                   + "\"granularity\": \"ALL\","
+                   + "\"dimensions\": [],"
+                   + "\"aggregations\": ["
+                   + "  { \"type\": \"bloom\", \"name\": \"bloom\", \"field\": \"innerSum\" }"
+                   + "],"
+                   + "\"intervals\": [ \"1970/2050\" ]"
+                   + "}";
+
+    MapBasedRow row = ingestAndQuery(query);
+
+
+    BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("bloom"));
+    Assert.assertTrue(filter.testLong(13L));
+    Assert.assertFalse(filter.testLong(5L));
+  }
+
+
+  @Test
+  public void testNestedQueryComplex() throws Exception
+  {
+    if (!isV2) {
+      return;
+    }
+
+    String query = "{"
+                   + "\"queryType\": \"groupBy\","
+                   + "\"dataSource\": {"
+                   + "\"type\": \"query\","
+                   + "\"query\": {"
+                   + "\"queryType\":\"groupBy\","
+                   + "\"dataSource\": \"test_datasource\","
+                   + "\"intervals\": [ \"1970/2050\" ],"
+                   + "\"granularity\":\"ALL\","
+                   + "\"dimensions\":[],"
+                   + "\"filter\":{ \"type\":\"selector\", \"dimension\":\"market\", \"value\":\"upfront\"},"
+                   + "\"aggregations\": [{ \"type\":\"bloom\", \"name\":\"innerBloom\", \"field\":\"quality\"}]"
+                   + "}"
+                   + "},"
+                   + "\"granularity\": \"ALL\","
+                   + "\"dimensions\": [],"
+                   + "\"aggregations\": ["
+                   + "  { \"type\": \"bloom\", \"name\": \"innerBloom\", \"field\": \"innerBloom\" }"
+                   + "],"
+                   + "\"intervals\": [ \"1970/2050\" ]"
+                   + "}";
+
+    MapBasedRow row = ingestAndQuery(query);
+
+
+    BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("innerBloom"));
+    Assert.assertTrue(filter.testString("mezzanine"));
+    Assert.assertTrue(filter.testString("premium"));
+    Assert.assertFalse(filter.testString("entertainment"));
+  }
+
+  @Test
   public void testQueryFakeDimension() throws Exception
   {
     String query = "{"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org