You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/02/01 21:54:51 UTC

[incubator-druid] branch master updated: bloom filter sql aggregator (#6950)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a5827e  bloom filter sql aggregator (#6950)
7a5827e is described below

commit 7a5827e12eb65eef80e08fe86ef76604019d6af8
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Fri Feb 1 13:54:46 2019 -0800

    bloom filter sql aggregator (#6950)
    
    * adds sql aggregator for bloom filter, adds complex value serde for sql results
    
    * fix tests
    
    * checkstyle
    
    * fix copy-paste
---
 docs/content/configuration/index.md                |   1 +
 .../development/extensions-core/bloom-filter.md    |  25 +-
 docs/content/querying/sql.md                       |   1 +
 .../druid/guice/BloomFilterExtensionModule.java    |   3 +-
 .../bloom/sql/BloomFilterSqlAggregator.java        | 212 +++++++
 .../apache/druid/query/filter/BloomKFilter.java    |   2 +-
 .../bloom/sql/BloomFilterSqlAggregatorTest.java    | 642 +++++++++++++++++++++
 .../druid/sql/calcite/planner/PlannerConfig.java   |  14 +-
 .../apache/druid/sql/calcite/rel/QueryMaker.java   |  14 +-
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |   8 +
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  17 +-
 .../druid/sql/calcite/http/SqlResourceTest.java    |   9 +-
 12 files changed, 936 insertions(+), 12 deletions(-)

diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 990f9ce..221cccd 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1418,6 +1418,7 @@ The Druid SQL server is configured through the following properties on the Broke
 |`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false|
 |`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries wihout filter condition on __time column will fail|false|
 |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|
+|`druid.sql.planner.serializeComplexValues`|Whether to serialize "complex" output values, false will return the class name instead of the serialized value.|true|
 
 #### Broker Caching
 
diff --git a/docs/content/development/extensions-core/bloom-filter.md b/docs/content/development/extensions-core/bloom-filter.md
index f878e75..651cc30 100644
--- a/docs/content/development/extensions-core/bloom-filter.md
+++ b/docs/content/development/extensions-core/bloom-filter.md
@@ -89,9 +89,9 @@ This string can then be used in the native or sql Druid query.
 
 Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
 
-### SQL Queries
+### Filtering SQL Queries
 
-Bloom filters are supported in SQL via the `bloom_filter_test` operator:
+Bloom filters can be used in SQL `WHERE` clauses via the `bloom_filter_test` operator:
 
 ```sql
 SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
@@ -108,7 +108,11 @@ bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
 
 ## Bloom Filter Query Aggregator
 
-Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator.
+Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator. Note that it is very 
+important to set a reasonable value for the `maxNumEntries` parameter, which is the maximum number of distinct entries 
+that the bloom filter can represent without increasing the false postive rate. It may be worth performing a query using
+one of the unique count sketches to calculate the value for this parameter in order to build a bloom filter appropriate 
+for the query. 
 
 ### JSON Specification of Bloom Filter Aggregator
 
@@ -157,8 +161,19 @@ response
 [{"timestamp":"2015-09-12T00:00:00.000Z","result":{"userBloom":"BAAAJhAAAA..."}}]
 ```
 
-These values can then be set in the filter specification above. 
+These values can then be set in the filter specification described above. 
 
 Ordering results by a bloom filter aggregator, for example in a TopN query, will perform a comparatively expensive 
 linear scan _of the filter itself_ to count the number of set bits as a means of approximating how many items have been 
-added to the set. As such, ordering by an alternate aggregation is recommended if possible. 
\ No newline at end of file
+added to the set. As such, ordering by an alternate aggregation is recommended if possible. 
+
+
+### SQL Bloom Filter Aggregator
+Bloom filters can be computed in SQL expressions with the `bloom_filter` aggregator:
+
+```sql
+SELECT BLOOM_FILTER(<expression>, <max number of entries>) FROM druid.foo WHERE dim2 = 'abc'
+```
+
+but requires the setting `druid.sql.planner.serializeComplexValues` to be set to `true`. Bloom filter results in an SQL
+ response are serialized into a base64 string, which can then be used in subsequent queries as a filter.
\ No newline at end of file
diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 6c9674c..358c1ec 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -120,6 +120,7 @@ Only the COUNT aggregation can accept DISTINCT.
 |`AVG(expr)`|Averages numbers.|
 |`APPROX_COUNT_DISTINCT(expr)`|Counts distinct values of expr, which can be a regular column or a hyperUnique column. This is always approximate, regardless of the value of "useApproximateCountDistinct". See also `COUNT(DISTINCT expr)`.|
 |`APPROX_QUANTILE(expr, probability, [resolution])`|Computes approximate quantiles on numeric or approxHistogram exprs. The "probability" should be between 0 and 1 (exclusive). The "resolution" is the number of centroids to use for the computation. Higher resolutions will give more precise results but also have higher overhead. If not provided, the default resolution is 50. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to [...]
+|`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positve rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.|
 
 ### Numeric functions
 
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java
index 3244889..8bef477 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java
@@ -22,6 +22,7 @@ package org.apache.druid.guice;
 import com.fasterxml.jackson.databind.Module;
 import com.google.inject.Binder;
 import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.query.aggregation.bloom.sql.BloomFilterSqlAggregator;
 import org.apache.druid.query.expressions.BloomFilterExprMacro;
 import org.apache.druid.query.filter.sql.BloomFilterOperatorConversion;
 import org.apache.druid.sql.guice.SqlBindings;
@@ -42,7 +43,7 @@ public class BloomFilterExtensionModule implements DruidModule
   public void configure(Binder binder)
   {
     SqlBindings.addOperatorConversion(binder, BloomFilterOperatorConversion.class);
-
+    SqlBindings.addAggregator(binder, BloomFilterSqlAggregator.class);
     ExpressionModule.addExprMacro(binder, BloomFilterExprMacro.class);
   }
 }
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
new file mode 100644
index 0000000..0a37dad
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.bloom.sql;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BloomFilterSqlAggregator implements SqlAggregator
+{
+  private static final SqlAggFunction FUNCTION_INSTANCE = new BloomFilterSqlAggFunction();
+  private static final String NAME = "BLOOM_FILTER";
+
+  @Override
+  public SqlAggFunction calciteFunction()
+  {
+    return FUNCTION_INSTANCE;
+  }
+
+  @Nullable
+  @Override
+  public Aggregation toDruidAggregation(
+      PlannerContext plannerContext,
+      RowSignature rowSignature,
+      RexBuilder rexBuilder,
+      String name,
+      AggregateCall aggregateCall,
+      Project project,
+      List<Aggregation> existingAggregations,
+      boolean finalizeAggregations
+  )
+  {
+    final RexNode inputOperand = Expressions.fromFieldAccess(
+        rowSignature,
+        project,
+        aggregateCall.getArgList().get(0)
+    );
+    final DruidExpression input = Expressions.toDruidExpression(
+        plannerContext,
+        rowSignature,
+        inputOperand
+    );
+    if (input == null) {
+      return null;
+    }
+
+    final AggregatorFactory aggregatorFactory;
+    final String aggName = StringUtils.format("%s:agg", name);
+    final RexNode maxNumEntriesOperand = Expressions.fromFieldAccess(
+        rowSignature,
+        project,
+        aggregateCall.getArgList().get(1)
+    );
+
+    if (!maxNumEntriesOperand.isA(SqlKind.LITERAL)) {
+      // maxNumEntriesOperand must be a literal in order to plan.
+      return null;
+    }
+
+    final int maxNumEntries = ((Number) RexLiteral.value(maxNumEntriesOperand)).intValue();
+
+    // Look for existing matching aggregatorFactory.
+    for (final Aggregation existing : existingAggregations) {
+      for (AggregatorFactory factory : existing.getAggregatorFactories()) {
+        if (factory instanceof BloomFilterAggregatorFactory) {
+          final BloomFilterAggregatorFactory theFactory = (BloomFilterAggregatorFactory) factory;
+
+          // Check input for equivalence.
+          final boolean inputMatches;
+          final VirtualColumn virtualInput =
+              existing.getVirtualColumns()
+                      .stream()
+                      .filter(virtualColumn ->
+                                  virtualColumn.getOutputName().equals(theFactory.getField().getOutputName())
+                      )
+                      .findFirst()
+                      .orElse(null);
+
+          if (virtualInput == null) {
+            if (input.isDirectColumnAccess()) {
+              inputMatches =
+                  input.getDirectColumn().equals(theFactory.getField().getDimension());
+            } else {
+              inputMatches =
+                  input.getSimpleExtraction().getColumn().equals(theFactory.getField().getDimension()) &&
+                  input.getSimpleExtraction().getExtractionFn().equals(theFactory.getField().getExtractionFn());
+            }
+          } else {
+            inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression().equals(input.getExpression());
+          }
+
+          final boolean matches = inputMatches && theFactory.getMaxNumEntries() == maxNumEntries;
+
+          if (matches) {
+            // Found existing one. Use this.
+            return Aggregation.create(
+                theFactory
+            );
+          }
+        }
+      }
+    }
+
+    // No existing match found. Create a new one.
+    final List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    ValueType valueType = Calcites.getValueTypeForSqlTypeName(inputOperand.getType().getSqlTypeName());
+    final DimensionSpec spec;
+    if (input.isDirectColumnAccess()) {
+      spec = new DefaultDimensionSpec(
+          input.getSimpleExtraction().getColumn(),
+          StringUtils.format("%s:%s", name, input.getSimpleExtraction().getColumn()),
+          valueType
+      );
+    } else if (input.isSimpleExtraction()) {
+      spec = new ExtractionDimensionSpec(
+          input.getSimpleExtraction().getColumn(),
+          StringUtils.format("%s:%s", name, input.getSimpleExtraction().getColumn()),
+          valueType,
+          input.getSimpleExtraction().getExtractionFn()
+      );
+    } else {
+      final ExpressionVirtualColumn virtualColumn = input.toVirtualColumn(
+          StringUtils.format("%s:v", aggName),
+          valueType,
+          plannerContext.getExprMacroTable()
+      );
+      virtualColumns.add(virtualColumn);
+      spec = new DefaultDimensionSpec(virtualColumn.getOutputName(), virtualColumn.getOutputName());
+    }
+
+    aggregatorFactory = new BloomFilterAggregatorFactory(
+        aggName,
+        spec,
+        maxNumEntries
+    );
+
+    return Aggregation.create(
+        virtualColumns,
+        aggregatorFactory
+    );
+  }
+
+  private static class BloomFilterSqlAggFunction extends SqlAggFunction
+  {
+    private static final String SIGNATURE1 = "'" + NAME + "(column, maxNumEntries)'\n";
+
+    BloomFilterSqlAggFunction()
+    {
+      super(
+          NAME,
+          null,
+          SqlKind.OTHER_FUNCTION,
+          ReturnTypes.explicit(SqlTypeName.OTHER),
+          null,
+          OperandTypes.and(
+              OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL),
+              OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC)
+          ),
+          SqlFunctionCategory.USER_DEFINED_FUNCTION,
+          false,
+          false
+      );
+    }
+  }
+}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
index 12533a2..090756a 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
@@ -33,7 +33,7 @@ import java.util.Arrays;
 
 /**
  * This is a direct modification of the Apache Hive 'BloomKFilter', found at:
- * https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
+ * https://github.com/apache/hive/blob/rel/storage-release-2.7.0/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
  * modified to store variables which are re-used instead of re-allocated per call as {@link ThreadLocal} so multiple
  * threads can share the same filter object. Note that this is snapshot at hive-storag-api version 2.7.0, latest
  * versions break compatibility with how int/float are stored in a bloom filter in this commit:
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
new file mode 100644
index 0000000..4641e27
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.bloom.sql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.guice.BloomFilterSerializersModule;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.extraction.SubstringDimExtractionFn;
+import org.apache.druid.query.filter.BloomKFilter;
+import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerFactory;
+import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.QueryLogHook;
+import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BloomFilterSqlAggregatorTest
+{
+  private static final int TEST_NUM_ENTRIES = 1000;
+  private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
+  private static final Injector injector = Guice.createInjector(
+      binder -> {
+        binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
+        binder.bind(LookupReferencesManager.class).toInstance(
+            LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
+                ImmutableMap.of(
+                    "a", "xa",
+                    "abc", "xabc"
+                )
+            )
+        );
+      }
+  );
+
+  private static ObjectMapper jsonMapper =
+      injector
+          .getInstance(Key.get(ObjectMapper.class, Json.class))
+          .registerModules(Collections.singletonList(new BloomFilterSerializersModule()));
+
+  private static final String DATA_SOURCE = "numfoo";
+
+  private static QueryRunnerFactoryConglomerate conglomerate;
+  private static Closer resourceCloser;
+
+  @BeforeClass
+  public static void setUpClass()
+  {
+    final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
+        .createQueryRunnerFactoryConglomerate();
+    conglomerate = conglomerateCloserPair.lhs;
+    resourceCloser = conglomerateCloserPair.rhs;
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    resourceCloser.close();
+  }
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public QueryLogHook queryLogHook = QueryLogHook.create(jsonMapper);
+
+  private SpecificSegmentsQuerySegmentWalker walker;
+  private SqlLifecycleFactory sqlLifecycleFactory;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    InputRowParser parser = new MapInputRowParser(
+        new TimeAndDimsParseSpec(
+            new TimestampSpec("t", "iso", null),
+            new DimensionsSpec(
+                ImmutableList.<DimensionSchema>builder()
+                    .addAll(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3")))
+                    .add(new DoubleDimensionSchema("d1"))
+                    .add(new FloatDimensionSchema("f1"))
+                    .add(new LongDimensionSchema("l1"))
+                    .build(),
+                null,
+                null
+            )
+        ));
+
+    final QueryableIndex index =
+        IndexBuilder.create()
+                    .tmpDir(temporaryFolder.newFolder())
+                    .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                    .schema(
+                        new IncrementalIndexSchema.Builder()
+                            .withMetrics(
+                                new CountAggregatorFactory("cnt"),
+                                new DoubleSumAggregatorFactory("m1", "m1")
+                            )
+                            .withDimensionsSpec(parser)
+                            .withRollup(false)
+                            .build()
+                    )
+                    .rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS)
+                    .buildMMappedIndex();
+
+
+    walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+        DataSegment.builder()
+                   .dataSource(DATA_SOURCE)
+                   .interval(index.getDataInterval())
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .build(),
+        index
+    );
+
+    final PlannerConfig plannerConfig = new PlannerConfig();
+    final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
+    final DruidOperatorTable operatorTable = new DruidOperatorTable(
+        ImmutableSet.of(new BloomFilterSqlAggregator()),
+        ImmutableSet.of()
+    );
+
+    sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
+        new PlannerFactory(
+            druidSchema,
+            systemSchema,
+            CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+            operatorTable,
+            CalciteTests.createExprMacroTable(),
+            plannerConfig,
+            AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+            jsonMapper
+        )
+    );
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    walker.close();
+    walker = null;
+  }
+
+  @Test
+  public void testBloomFilterAgg() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(dim1, 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1"));
+      if (raw == null) {
+        expected1.addBytes(null, 0, 0);
+      } else {
+        expected1.addString(raw);
+      }
+    }
+
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected1)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .aggregators(
+                  ImmutableList.of(
+                    new BloomFilterAggregatorFactory(
+                        "a0:agg",
+                        new DefaultDimensionSpec("dim1", "a0:dim1"),
+                        TEST_NUM_ENTRIES
+                    )
+                  )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
+  @Test
+  public void testBloomFilterTwoAggs() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(dim1, 1000),\n"
+                       + "BLOOM_FILTER(dim2, 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
+    BloomKFilter expected2 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1"));
+      if (raw == null) {
+        expected1.addBytes(null, 0, 0);
+      } else {
+        expected1.addString(raw);
+      }
+      List<String> lst = row.getDimension("dim2");
+      if (lst.size() == 0) {
+        expected2.addBytes(null, 0, 0);
+      }
+      for (String s : lst) {
+        String val = NullHandling.emptyToNullIfNeeded(s);
+        if (val == null) {
+          expected2.addBytes(null, 0, 0);
+        } else {
+          expected2.addString(val);
+        }
+      }
+    }
+
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected1),
+            jsonMapper.writeValueAsString(expected2)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .aggregators(
+                  ImmutableList.of(
+                      new BloomFilterAggregatorFactory(
+                          "a0:agg",
+                          new DefaultDimensionSpec("dim1", "a0:dim1"),
+                          TEST_NUM_ENTRIES
+                      ),
+                      new BloomFilterAggregatorFactory(
+                          "a1:agg",
+                          new DefaultDimensionSpec("dim2", "a1:dim2"),
+                          TEST_NUM_ENTRIES
+                      )
+                  )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
+  @Test
+  public void testBloomFilterAggExtractionFn() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(SUBSTRING(dim1, 1, 1), 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1"));
+      // empty string extractionFn produces null
+      if (raw == null || "".equals(raw)) {
+        expected1.addBytes(null, 0, 0);
+      } else {
+        expected1.addString(raw.substring(0, 1));
+      }
+    }
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected1)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .aggregators(
+                  ImmutableList.of(
+                      new BloomFilterAggregatorFactory(
+                          "a0:agg",
+                          new ExtractionDimensionSpec(
+                              "dim1",
+                              "a0:dim1",
+                              new SubstringDimExtractionFn(0, 1)
+                          ),
+                          TEST_NUM_ENTRIES
+                      )
+                  )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
+  @Test
+  public void testBloomFilterAggLong() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(l1, 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+
+    BloomKFilter expected3 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      Object raw = row.getRaw("l1");
+      if (raw == null) {
+        if (NullHandling.replaceWithDefault()) {
+          expected3.addLong(NullHandling.defaultLongValue());
+        } else {
+          expected3.addBytes(null, 0, 0);
+        }
+      } else {
+        expected3.addLong(((Number) raw).longValue());
+      }
+    }
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected3)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .aggregators(
+                  ImmutableList.of(
+                    new BloomFilterAggregatorFactory(
+                        "a0:agg",
+                        new DefaultDimensionSpec("l1", "a0:l1", ValueType.LONG),
+                        TEST_NUM_ENTRIES
+                    )
+                )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
+  @Test
+  public void testBloomFilterAggLongVirtualColumn() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(l1 * 2, 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      Object raw = row.getRaw("l1");
+      if (raw == null) {
+        if (NullHandling.replaceWithDefault()) {
+          expected1.addLong(NullHandling.defaultLongValue());
+        } else {
+          expected1.addBytes(null, 0, 0);
+        }
+      } else {
+        expected1.addLong(2 * ((Number) raw).longValue());
+      }
+    }
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected1)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .virtualColumns(
+                  new ExpressionVirtualColumn(
+                    "a0:agg:v",
+                    "(\"l1\" * 2)",
+                    ValueType.LONG,
+                    TestExprMacroTable.INSTANCE
+                )
+              )
+              .aggregators(
+                  ImmutableList.of(
+                    new BloomFilterAggregatorFactory(
+                        "a0:agg",
+                        new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"),
+                        TEST_NUM_ENTRIES
+                    )
+                )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
+  @Test
+  public void testBloomFilterAggFloatVirtualColumn() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(f1 * 2, 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      Object raw = row.getRaw("f1");
+      if (raw == null) {
+        if (NullHandling.replaceWithDefault()) {
+          expected1.addFloat(NullHandling.defaultFloatValue());
+        } else {
+          expected1.addBytes(null, 0, 0);
+        }
+      } else {
+        expected1.addFloat(2 * ((Number) raw).floatValue());
+      }
+    }
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected1)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    // Verify query
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .virtualColumns(
+                  new ExpressionVirtualColumn(
+                    "a0:agg:v",
+                    "(\"f1\" * 2)",
+                    ValueType.FLOAT,
+                    TestExprMacroTable.INSTANCE
+                )
+              )
+              .aggregators(
+                  ImmutableList.of(
+                    new BloomFilterAggregatorFactory(
+                        "a0:agg",
+                        new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"),
+                        TEST_NUM_ENTRIES
+                    )
+                )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+
+  @Test
+  public void testBloomFilterAggDoubleVirtualColumn() throws Exception
+  {
+    SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+    final String sql = "SELECT\n"
+                       + "BLOOM_FILTER(d1 * 2, 1000)\n"
+                       + "FROM numfoo";
+
+    final List<Object[]> results =
+        sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
+
+    BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
+    for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
+      Object raw = row.getRaw("d1");
+      if (raw == null) {
+        if (NullHandling.replaceWithDefault()) {
+          expected1.addDouble(NullHandling.defaultDoubleValue());
+        } else {
+          expected1.addBytes(null, 0, 0);
+        }
+      } else {
+        expected1.addDouble(2 * ((Number) raw).doubleValue());
+      }
+    }
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{
+            jsonMapper.writeValueAsString(expected1)
+        }
+    );
+    Assert.assertEquals(expectedResults.size(), results.size());
+    for (int i = 0; i < expectedResults.size(); i++) {
+      Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
+    }
+
+    // Verify query
+    Assert.assertEquals(
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource(CalciteTests.DATASOURCE3)
+              .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+              .granularity(Granularities.ALL)
+              .virtualColumns(
+                  new ExpressionVirtualColumn(
+                    "a0:agg:v",
+                    "(\"d1\" * 2)",
+                    ValueType.DOUBLE,
+                    TestExprMacroTable.INSTANCE
+                )
+              )
+              .aggregators(
+                  ImmutableList.of(
+                    new BloomFilterAggregatorFactory(
+                        "a0:agg",
+                        new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"),
+                        TEST_NUM_ENTRIES
+                    )
+                )
+              )
+              .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
+              .build(),
+        Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
+    );
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
index fe9e72f..766bf92 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
@@ -66,6 +66,9 @@ public class PlannerConfig
   @JsonProperty
   private DateTimeZone sqlTimeZone = DateTimeZone.UTC;
 
+  @JsonProperty
+  private boolean serializeComplexValues = true;
+
   public Period getMetadataRefreshPeriod()
   {
     return metadataRefreshPeriod;
@@ -121,6 +124,11 @@ public class PlannerConfig
     return awaitInitializationOnStart;
   }
 
+  public boolean shouldSerializeComplexValues()
+  {
+    return serializeComplexValues;
+  }
+
   public PlannerConfig withOverrides(final Map<String, Object> context)
   {
     if (context == null) {
@@ -151,6 +159,7 @@ public class PlannerConfig
     newConfig.requireTimeCondition = isRequireTimeCondition();
     newConfig.sqlTimeZone = getSqlTimeZone();
     newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart();
+    newConfig.serializeComplexValues = shouldSerializeComplexValues();
     return newConfig;
   }
 
@@ -191,6 +200,7 @@ public class PlannerConfig
            useFallback == that.useFallback &&
            requireTimeCondition == that.requireTimeCondition &&
            awaitInitializationOnStart == that.awaitInitializationOnStart &&
+           serializeComplexValues == that.serializeComplexValues &&
            Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) &&
            Objects.equals(sqlTimeZone, that.sqlTimeZone);
   }
@@ -210,7 +220,8 @@ public class PlannerConfig
         useFallback,
         requireTimeCondition,
         awaitInitializationOnStart,
-        sqlTimeZone
+        sqlTimeZone,
+        serializeComplexValues
     );
   }
 
@@ -229,6 +240,7 @@ public class PlannerConfig
            ", requireTimeCondition=" + requireTimeCondition +
            ", awaitInitializationOnStart=" + awaitInitializationOnStart +
            ", sqlTimeZone=" + sqlTimeZone +
+           ", serializeComplexValues=" + serializeComplexValues +
            '}';
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java
index b3f7d0b..658fb54 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.calcite.rel;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -493,8 +494,17 @@ public class QueryMaker
         throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
       }
     } else if (sqlType == SqlTypeName.OTHER) {
-      // Complex type got out somehow.
-      coercedValue = value.getClass().getName();
+      // Complex type, try to serialize if we should, else print class name
+      if (plannerContext.getPlannerConfig().shouldSerializeComplexValues()) {
+        try {
+          coercedValue = jsonMapper.writeValueAsString(value);
+        }
+        catch (JsonProcessingException jex) {
+          throw new ISE(jex, "Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
+        }
+      } else {
+        coercedValue = value.getClass().getName();
+      }
     } else {
       throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
     }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index b5d0831..6b47b42 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -101,6 +101,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase
   public static final Logger log = new Logger(BaseCalciteQueryTest.class);
 
   public static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
+  public static final PlannerConfig PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE = new PlannerConfig()
+  {
+    @Override
+    public boolean shouldSerializeComplexValues()
+    {
+      return false;
+    }
+  };
   public static final PlannerConfig PLANNER_CONFIG_REQUIRE_TIME_CONDITION = new PlannerConfig()
   {
     @Override
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 46aed6e..b89ae45 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -422,7 +422,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   {
     String hyperLogLogCollectorClassName = HLLC_STRING;
     testQuery(
+        PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
+        QUERY_CONTEXT_DEFAULT,
         "SELECT * FROM druid.foo",
+        CalciteTests.REGULAR_USER_AUTH_RESULT,
         ImmutableList.of(
             newScanQueryBuilder()
                 .dataSource(CalciteTests.DATASOURCE1)
@@ -474,7 +477,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 "abcd",
                 9999.0f,
                 NullHandling.defaultDoubleValue(),
-                HLLC_STRING
+                "\"AQAAAQAAAALFBA==\""
             }
         )
     );
@@ -518,7 +521,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   public void testSelectStarWithLimit() throws Exception
   {
     testQuery(
+        PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
+        QUERY_CONTEXT_DEFAULT,
         "SELECT * FROM druid.foo LIMIT 2",
+        CalciteTests.REGULAR_USER_AUTH_RESULT,
         ImmutableList.of(
             newScanQueryBuilder()
                 .dataSource(CalciteTests.DATASOURCE1)
@@ -565,7 +571,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   public void testSelectStarWithLimitTimeDescending() throws Exception
   {
     testQuery(
+        PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
+        QUERY_CONTEXT_DEFAULT,
         "SELECT * FROM druid.foo ORDER BY __time DESC LIMIT 2",
+        CalciteTests.REGULAR_USER_AUTH_RESULT,
         ImmutableList.of(
             Druids.newSelectQueryBuilder()
                   .dataSource(CalciteTests.DATASOURCE1)
@@ -589,7 +598,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   public void testSelectStarWithoutLimitTimeAscending() throws Exception
   {
     testQuery(
+        PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
+        QUERY_CONTEXT_DEFAULT,
         "SELECT * FROM druid.foo ORDER BY __time",
+        CalciteTests.REGULAR_USER_AUTH_RESULT,
         ImmutableList.of(
             Druids.newSelectQueryBuilder()
                   .dataSource(CalciteTests.DATASOURCE1)
@@ -1852,7 +1864,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   public void testSelectStarWithDimFilter() throws Exception
   {
     testQuery(
+        PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
+        QUERY_CONTEXT_DEFAULT,
         "SELECT * FROM druid.foo WHERE dim1 > 'd' OR dim2 = 'a'",
+        CalciteTests.REGULAR_USER_AUTH_RESULT,
         ImmutableList.of(
             newScanQueryBuilder()
                 .dataSource(CalciteTests.DATASOURCE1)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
index 8b1c9aa..14b2268 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
@@ -112,7 +112,14 @@ public class SqlResourceTest extends CalciteTestBase
   {
     walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
 
-    final PlannerConfig plannerConfig = new PlannerConfig();
+    final PlannerConfig plannerConfig = new PlannerConfig()
+    {
+      @Override
+      public boolean shouldSerializeComplexValues()
+      {
+        return false;
+      }
+    };
     final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
     final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org