You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2020/06/19 18:35:36 UTC

[druid] branch master updated: fix topn on string columns with non-sorted or non-unique dictionaries (#10053)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c2f5d45  fix topn on string columns with non-sorted or non-unique dictionaries (#10053)
c2f5d45 is described below

commit c2f5d453f87d0863fba532e9ff4f3e7369db12e3
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Jun 19 11:35:18 2020 -0700

    fix topn on string columns with non-sorted or non-unique dictionaries (#10053)
    
    * fix topn on string columns with non-sorted or non-unique dictionaries
    
    * fix metadata tests
    
    * refactor, clarify comments and code, fix ci failures
---
 .../benchmark/TopNTypeInterfaceBenchmark.java      |   4 +-
 .../apache/druid/query/topn/TopNQueryEngine.java   |  88 +++++++++-----
 .../types/StringTopNColumnAggregatesProcessor.java |   2 +-
 .../org/apache/druid/server/QueryStackTests.java   |  29 +++++
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |  12 ++
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |   5 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  43 +++++++
 .../druid/sql/calcite/util/CalciteTests.java       | 132 +++++++++++++++++++--
 .../util/SpecificSegmentsQuerySegmentWalker.java   |  10 +-
 9 files changed, 276 insertions(+), 49 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index 241bd80..bf7733d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -165,7 +165,7 @@ public class TopNTypeInterfaceBenchmark
       queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
       queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
 
-      // Use an IdentityExtractionFn to force usage of DimExtractionTopNAlgorithm
+      // Use an IdentityExtractionFn to force usage of HeapBasedTopNAlgorithm
       TopNQueryBuilder queryBuilderString = new TopNQueryBuilder()
           .dataSource("blah")
           .granularity(Granularities.ALL)
@@ -174,7 +174,7 @@ public class TopNTypeInterfaceBenchmark
           .intervals(intervalSpec)
           .aggregators(queryAggs);
 
-      // DimExtractionTopNAlgorithm is always used for numeric columns
+      // HeapBasedTopNAlgorithm is always used for numeric columns
       TopNQueryBuilder queryBuilderLong = new TopNQueryBuilder()
           .dataSource("blah")
           .granularity(Granularities.ALL)
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index 1d1bccd..f1eab7f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -120,36 +120,34 @@ public class TopNQueryEngine
 
 
     final TopNAlgorithm<?, ?> topNAlgorithm;
-    if (
-        selector.isHasExtractionFn() &&
+    if (requiresHeapAlgorithm(selector, query, columnCapabilities)) {
+      // heap based algorithm selection
+      if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
         // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
-        // Once we have arbitrary dimension types following check should be replaced by checking
-        // that the column is of type long and single-value.
-        dimension.equals(ColumnHolder.TIME_COLUMN_NAME)
-        ) {
-      // A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm
-      // currently relies on the dimension cardinality to support lexicographic sorting
-      topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
-    } else if (selector.isHasExtractionFn()) {
-      topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
-    } else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING
-                                               && columnCapabilities.isDictionaryEncoded())) {
-      // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know
-      // which can happen for 'inline' data sources when this is run on the broker
-      topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
-    } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
-      // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
-      // a many-to-one mapping, since numeric types can't represent all possible values of other types.)
-      topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
-    } else if (selector.isAggregateAllMetrics()) {
-      // sorted by dimension
-      topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
-    } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
-      // high cardinality dimensions with larger result sets
-      topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
+        // We might be able to use this for any long column with an extraction function, that is
+        //  ValueType.LONG.equals(columnCapabilities.getType())
+        // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
+
+        // A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
+        // currently relies on the dimension cardinality to support lexicographic sorting
+        topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
+      } else {
+        topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
+      }
     } else {
-      // anything else
-      topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
+      // pool based algorithm selection
+      if (selector.isAggregateAllMetrics()) {
+        // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
+        // this
+        topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
+      } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
+        // for high cardinality dimensions with larger result sets we aggregate with only the ordering aggregation to
+        // compute the first 'n' values, and then for the rest of the metrics but for only the 'n' values
+        topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
+      } else {
+        // anything else, use the regular pooled algorithm
+        topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
+      }
     }
     if (queryMetrics != null) {
       queryMetrics.algorithm(topNAlgorithm);
@@ -158,6 +156,40 @@ public class TopNQueryEngine
     return new TopNMapFn(query, topNAlgorithm);
   }
 
+  /**
+   * {@link PooledTopNAlgorithm} (and {@link AggregateTopNMetricFirstAlgorithm} which utilizes the pooled
+   * algorithm) are optimized off-heap algorithms for aggregating dictionary encoded string columns. These algorithms
+   * rely on dictionary ids being unique so to aggregate on the dictionary ids directly and defer
+   * {@link org.apache.druid.segment.DimensionSelector#lookupName(int)} until as late as possible in query processing.
+   *
+   * When these conditions are not true, we have an on-heap fall-back algorithm, the {@link HeapBasedTopNAlgorithm}
+   * (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
+   * selectors.
+   */
+  private static boolean requiresHeapAlgorithm(
+      final TopNAlgorithmSelector selector,
+      final TopNQuery query,
+      final ColumnCapabilities capabilities
+  )
+  {
+    if (selector.isHasExtractionFn()) {
+      // extraction functions can have a many to one mapping, and should use a heap algorithm
+      return true;
+    }
+
+    if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
+      // non-string output cannot use the pooled algorith, even if the underlying selector supports it
+      return true;
+    }
+    if (capabilities != null && capabilities.getType() == ValueType.STRING) {
+      // string columns must use the on heap algorithm unless they have the following capabilites
+      return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue());
+    } else {
+      // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
+      return true;
+    }
+  }
+
   public static boolean canApplyExtractionInPost(TopNQuery query)
   {
     return query.getDimensionSpec() != null
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index cea7c77..92b2e8a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -59,7 +59,7 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates
       throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
     }
 
-    // This method is used for the DimExtractionTopNAlgorithm only.
+    // This method is used for the HeapBasedTopNAlgorithm only.
     // Unlike regular topN we cannot rely on ordering to optimize.
     // Optimization possibly requires a reverse lookup from value to ID, which is
     // not possible when applying an extraction function
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 445d8d6..8a468d2 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -25,9 +25,12 @@ import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -41,6 +44,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
 import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
 import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
@@ -61,7 +65,10 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.SegmentWrangler;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.join.InlineJoinableFactory;
 import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.LookupJoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactoryTest;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
@@ -273,4 +280,26 @@ public class QueryStackTests
     return conglomerate;
   }
 
+  public static JoinableFactory makeJoinableFactoryForLookup(
+      LookupExtractorFactoryContainerProvider lookupProvider
+  )
+  {
+    return makeJoinableFactoryFromDefault(lookupProvider, null);
+  }
+
+  public static JoinableFactory makeJoinableFactoryFromDefault(
+      @Nullable LookupExtractorFactoryContainerProvider lookupProvider,
+      @Nullable Map<Class<? extends DataSource>, JoinableFactory> custom
+  )
+  {
+    ImmutableMap.Builder<Class<? extends DataSource>, JoinableFactory> builder = ImmutableMap.builder();
+    builder.put(InlineDataSource.class, new InlineJoinableFactory());
+    if (lookupProvider != null) {
+      builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider));
+    }
+    if (custom != null) {
+      builder.putAll(custom);
+    }
+    return MapJoinableFactoryTest.fromMap(builder.build());
+  }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 6b836b8..3f43f76 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -386,6 +386,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
         ImmutableList.of(
             row(
                 Pair.of("TABLE_CAT", "druid"),
+                Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
+                Pair.of("TABLE_SCHEM", "druid"),
+                Pair.of("TABLE_TYPE", "TABLE")
+            ),
+            row(
+                Pair.of("TABLE_CAT", "druid"),
                 Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
                 Pair.of("TABLE_SCHEM", "druid"),
                 Pair.of("TABLE_TYPE", "TABLE")
@@ -443,6 +449,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
         ImmutableList.of(
             row(
                 Pair.of("TABLE_CAT", "druid"),
+                Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
+                Pair.of("TABLE_SCHEM", "druid"),
+                Pair.of("TABLE_TYPE", "TABLE")
+            ),
+            row(
+                Pair.of("TABLE_CAT", "druid"),
                 Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
                 Pair.of("TABLE_SCHEM", "druid"),
                 Pair.of("TABLE_TYPE", "TABLE")
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 952b3cf..7c475e5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -422,7 +422,10 @@ public class BaseCalciteQueryTest extends CalciteTestBase
   @Before
   public void setUp() throws Exception
   {
-    walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
+    walker = CalciteTests.createMockWalker(
+        conglomerate,
+        temporaryFolder.newFolder()
+    );
   }
 
   @After
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1606ef7..622842f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.GlobalTableDataSource;
 import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
@@ -712,6 +713,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
         ImmutableList.of(),
         ImmutableList.<Object[]>builder()
+            .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
             .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
             .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
             .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
@@ -741,6 +743,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         CalciteTests.SUPER_USER_AUTH_RESULT,
         ImmutableList.of(),
         ImmutableList.<Object[]>builder()
+            .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
             .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
             .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
             .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
@@ -14997,6 +15000,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
     );
   }
 
+  @Test
+  @Parameters(source = QueryContextForJoinProvider.class)
+  public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map<String, Object> queryContext) throws Exception
+  {
+    testQuery(
+        "SELECT druid.broadcast.dim4, COUNT(*)\n"
+        + "FROM druid.numfoo\n"
+        + "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
+        + "GROUP BY 1 ORDER BY 2 LIMIT 4",
+        queryContext,
+        ImmutableList.of(
+            new TopNQueryBuilder()
+                .dataSource(
+                    join(
+                        new TableDataSource(CalciteTests.DATASOURCE3),
+                        new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
+                        "j0.",
+                        equalsCondition(
+                            DruidExpression.fromColumn("dim4"),
+                            DruidExpression.fromColumn("j0.dim4")
+                        ),
+                        JoinType.INNER
+
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
+                .threshold(4)
+                .aggregators(aggregators(new CountAggregatorFactory("a0")))
+                .context(queryContext)
+                .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a", 9L},
+            new Object[]{"b", 9L}
+        )
+    );
+  }
+
   /**
    * This is a provider of query contexts that should be used by join tests.
    * It tests various configs that can be passed to join queries. All the configs provided by this provider should
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index ca683a2..dfee466 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -50,13 +50,17 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.ExpressionModule;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
 import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.GlobalTableDataSource;
+import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QuerySegmentWalker;
@@ -72,8 +76,14 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.join.MapJoinableFactory;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.Joinable;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.table.IndexedTableJoinable;
+import org.apache.druid.segment.join.table.RowBasedIndexedTable;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.DruidNode;
@@ -125,6 +135,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.function.BooleanSupplier;
@@ -140,6 +151,7 @@ public class CalciteTests
   public static final String DATASOURCE3 = "numfoo";
   public static final String DATASOURCE4 = "foo4";
   public static final String DATASOURCE5 = "lotsocolumns";
+  public static final String BROADCAST_DATASOURCE = "broadcast";
   public static final String FORBIDDEN_DATASOURCE = "forbiddenDatasource";
   public static final String SOME_DATASOURCE = "some_datasource";
   public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource";
@@ -214,7 +226,7 @@ public class CalciteTests
 
   private static final String TIMESTAMP_COLUMN = "t";
 
-  private static final Injector INJECTOR = Guice.createInjector(
+  public static final Injector INJECTOR = Guice.createInjector(
       binder -> {
         binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
 
@@ -605,6 +617,68 @@ public class CalciteTests
       )
   );
 
+  private static final InlineDataSource JOINABLE_BACKING_DATA = InlineDataSource.fromIterable(
+      RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{
+          x.get("dim1"),
+          x.get("dim2"),
+          x.get("dim3"),
+          x.get("dim4"),
+          x.get("dim5"),
+          x.get("d1"),
+          x.get("d2"),
+          x.get("f1"),
+          x.get("f2"),
+          x.get("l1"),
+          x.get("l2")
+      }).collect(Collectors.toList()),
+      RowSignature.builder()
+                  .add("dim1", ValueType.STRING)
+                  .add("dim2", ValueType.STRING)
+                  .add("dim3", ValueType.STRING)
+                  .add("dim4", ValueType.STRING)
+                  .add("dim5", ValueType.STRING)
+                  .add("d1", ValueType.DOUBLE)
+                  .add("d2", ValueType.DOUBLE)
+                  .add("f1", ValueType.FLOAT)
+                  .add("f2", ValueType.FLOAT)
+                  .add("l1", ValueType.LONG)
+                  .add("l2", ValueType.LONG)
+                  .build()
+  );
+
+  private static final Set<String> KEY_COLUMNS = ImmutableSet.of("dim4");
+
+  private static final RowBasedIndexedTable JOINABLE_TABLE = new RowBasedIndexedTable(
+      JOINABLE_BACKING_DATA.getRowsAsList(),
+      JOINABLE_BACKING_DATA.rowAdapter(),
+      JOINABLE_BACKING_DATA.getRowSignature(),
+      KEY_COLUMNS,
+      DateTimes.nowUtc().toString()
+  );
+
+  public static GlobalTableDataSource CUSTOM_TABLE = new GlobalTableDataSource(BROADCAST_DATASOURCE);
+
+  public static JoinableFactory CUSTOM_ROW_TABLE_JOINABLE = new JoinableFactory()
+  {
+    @Override
+    public boolean isDirectlyJoinable(DataSource dataSource)
+    {
+      return CUSTOM_TABLE.equals(dataSource);
+    }
+
+    @Override
+    public Optional<Joinable> build(
+        DataSource dataSource,
+        JoinConditionAnalysis condition
+    )
+    {
+      if (dataSource instanceof GlobalTableDataSource) {
+        return Optional.of(new IndexedTableJoinable(JOINABLE_TABLE));
+      }
+      return Optional.empty();
+    }
+  };
+
   private CalciteTests()
   {
     // No instantiation.
@@ -649,12 +723,28 @@ public class CalciteTests
     return INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class));
   }
 
+  public static JoinableFactory createDefaultJoinableFactory()
+  {
+    return QueryStackTests.makeJoinableFactoryFromDefault(
+        INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
+        ImmutableMap.of(
+            GlobalTableDataSource.class,
+            CUSTOM_ROW_TABLE_JOINABLE
+        )
+    );
+  }
+
   public static SpecificSegmentsQuerySegmentWalker createMockWalker(
       final QueryRunnerFactoryConglomerate conglomerate,
       final File tmpDir
   )
   {
-    return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
+    return createMockWalker(
+        conglomerate,
+        tmpDir,
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER,
+        createDefaultJoinableFactory()
+    );
   }
 
   public static SpecificSegmentsQuerySegmentWalker createMockWalker(
@@ -663,6 +753,16 @@ public class CalciteTests
       final QueryScheduler scheduler
   )
   {
+    return createMockWalker(conglomerate, tmpDir, scheduler, null);
+  }
+
+  public static SpecificSegmentsQuerySegmentWalker createMockWalker(
+      final QueryRunnerFactoryConglomerate conglomerate,
+      final File tmpDir,
+      final QueryScheduler scheduler,
+      final JoinableFactory joinableFactory
+  )
+  {
     final QueryableIndex index1 = IndexBuilder
         .create()
         .tmpDir(new File(tmpDir, "1"))
@@ -713,7 +813,7 @@ public class CalciteTests
 
     final QueryableIndex someDatasourceIndex = IndexBuilder
         .create()
-        .tmpDir(new File(tmpDir, "1"))
+        .tmpDir(new File(tmpDir, "6"))
         .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
         .schema(INDEX_SCHEMA)
         .rows(ROWS1)
@@ -721,7 +821,7 @@ public class CalciteTests
 
     final QueryableIndex someXDatasourceIndex = IndexBuilder
         .create()
-        .tmpDir(new File(tmpDir, "1"))
+        .tmpDir(new File(tmpDir, "7"))
         .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
         .schema(INDEX_SCHEMA_WITH_X_COLUMNS)
         .rows(RAW_ROWS1_X)
@@ -731,7 +831,7 @@ public class CalciteTests
     return new SpecificSegmentsQuerySegmentWalker(
         conglomerate,
         INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
-        null,
+        joinableFactory,
         scheduler
     ).add(
         DataSegment.builder()
@@ -805,6 +905,15 @@ public class CalciteTests
                    .size(0)
                    .build(),
         someXDatasourceIndex
+    ).add(
+        DataSegment.builder()
+                   .dataSource(BROADCAST_DATASOURCE)
+                   .interval(indexNumericDims.getDataInterval())
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build(),
+        indexNumericDims
     );
   }
 
@@ -979,8 +1088,15 @@ public class CalciteTests
     final DruidSchema schema = new DruidSchema(
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         new TestServerInventoryView(walker.getSegments()),
-        new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
-        new MapJoinableFactory(ImmutableMap.of()),
+        new SegmentManager(EasyMock.createMock(SegmentLoader.class))
+        {
+          @Override
+          public Set<String> getDataSourceNames()
+          {
+            return ImmutableSet.of(BROADCAST_DATASOURCE);
+          }
+        },
+        createDefaultJoinableFactory(),
         plannerConfig,
         viewManager,
         TEST_AUTHENTICATOR_ESCALATOR
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 0490420..7fde642 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -40,10 +40,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentWrangler;
-import org.apache.druid.segment.join.InlineJoinableFactory;
 import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.LookupJoinableFactory;
-import org.apache.druid.segment.join.MapJoinableFactoryTest;
 import org.apache.druid.server.ClientQuerySegmentWalker;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QueryStackTests;
@@ -91,12 +88,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
     final JoinableFactory joinableFactoryToUse;
 
     if (joinableFactory == null) {
-      joinableFactoryToUse = MapJoinableFactoryTest.fromMap(
-          ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
-              .put(InlineDataSource.class, new InlineJoinableFactory())
-              .put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
-              .build()
-      );
+      joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(lookupProvider);
     } else {
       joinableFactoryToUse = joinableFactory;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org