You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2020/06/19 18:35:36 UTC
[druid] branch master updated: fix topn on string columns with
non-sorted or non-unique dictionaries (#10053)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c2f5d45 fix topn on string columns with non-sorted or non-unique dictionaries (#10053)
c2f5d45 is described below
commit c2f5d453f87d0863fba532e9ff4f3e7369db12e3
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Jun 19 11:35:18 2020 -0700
fix topn on string columns with non-sorted or non-unique dictionaries (#10053)
* fix topn on string columns with non-sorted or non-unique dictionaries
* fix metadata tests
* refactor, clarify comments and code, fix ci failures
---
.../benchmark/TopNTypeInterfaceBenchmark.java | 4 +-
.../apache/druid/query/topn/TopNQueryEngine.java | 88 +++++++++-----
.../types/StringTopNColumnAggregatesProcessor.java | 2 +-
.../org/apache/druid/server/QueryStackTests.java | 29 +++++
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 12 ++
.../druid/sql/calcite/BaseCalciteQueryTest.java | 5 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 43 +++++++
.../druid/sql/calcite/util/CalciteTests.java | 132 +++++++++++++++++++--
.../util/SpecificSegmentsQuerySegmentWalker.java | 10 +-
9 files changed, 276 insertions(+), 49 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index 241bd80..bf7733d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -165,7 +165,7 @@ public class TopNTypeInterfaceBenchmark
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
- // Use an IdentityExtractionFn to force usage of DimExtractionTopNAlgorithm
+ // Use an IdentityExtractionFn to force usage of HeapBasedTopNAlgorithm
TopNQueryBuilder queryBuilderString = new TopNQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
@@ -174,7 +174,7 @@ public class TopNTypeInterfaceBenchmark
.intervals(intervalSpec)
.aggregators(queryAggs);
- // DimExtractionTopNAlgorithm is always used for numeric columns
+ // HeapBasedTopNAlgorithm is always used for numeric columns
TopNQueryBuilder queryBuilderLong = new TopNQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index 1d1bccd..f1eab7f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -120,36 +120,34 @@ public class TopNQueryEngine
final TopNAlgorithm<?, ?> topNAlgorithm;
- if (
- selector.isHasExtractionFn() &&
+ if (requiresHeapAlgorithm(selector, query, columnCapabilities)) {
+ // heap based algorithm selection
+ if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
- // Once we have arbitrary dimension types following check should be replaced by checking
- // that the column is of type long and single-value.
- dimension.equals(ColumnHolder.TIME_COLUMN_NAME)
- ) {
- // A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm
- // currently relies on the dimension cardinality to support lexicographic sorting
- topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
- } else if (selector.isHasExtractionFn()) {
- topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
- } else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING
- && columnCapabilities.isDictionaryEncoded())) {
- // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know
- // which can happen for 'inline' data sources when this is run on the broker
- topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
- } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
- // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
- // a many-to-one mapping, since numeric types can't represent all possible values of other types.)
- topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
- } else if (selector.isAggregateAllMetrics()) {
- // sorted by dimension
- topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
- } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
- // high cardinality dimensions with larger result sets
- topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
+ // We might be able to use this for any long column with an extraction function, that is
+ // ValueType.LONG.equals(columnCapabilities.getType())
+ // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
+
+ // A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
+ // currently relies on the dimension cardinality to support lexicographic sorting
+ topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
+ } else {
+ topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
+ }
} else {
- // anything else
- topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
+ // pool based algorithm selection
+ if (selector.isAggregateAllMetrics()) {
+ // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
+ // this
+ topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
+ } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
+ // for high cardinality dimensions with larger result sets we aggregate with only the ordering aggregation to
+ // compute the first 'n' values, and then for the rest of the metrics but for only the 'n' values
+ topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
+ } else {
+ // anything else, use the regular pooled algorithm
+ topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
+ }
}
if (queryMetrics != null) {
queryMetrics.algorithm(topNAlgorithm);
@@ -158,6 +156,40 @@ public class TopNQueryEngine
return new TopNMapFn(query, topNAlgorithm);
}
+ /**
+ * {@link PooledTopNAlgorithm} (and {@link AggregateTopNMetricFirstAlgorithm} which utilizes the pooled
+ * algorithm) are optimized off-heap algorithms for aggregating dictionary encoded string columns. These algorithms
+ * rely on dictionary ids being unique so to aggregate on the dictionary ids directly and defer
+ * {@link org.apache.druid.segment.DimensionSelector#lookupName(int)} until as late as possible in query processing.
+ *
+ * When these conditions are not true, we have an on-heap fall-back algorithm, the {@link HeapBasedTopNAlgorithm}
+ * (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
+ * selectors.
+ */
+ private static boolean requiresHeapAlgorithm(
+ final TopNAlgorithmSelector selector,
+ final TopNQuery query,
+ final ColumnCapabilities capabilities
+ )
+ {
+ if (selector.isHasExtractionFn()) {
+ // extraction functions can have a many to one mapping, and should use a heap algorithm
+ return true;
+ }
+
+ if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
+ // non-string output cannot use the pooled algorith, even if the underlying selector supports it
+ return true;
+ }
+ if (capabilities != null && capabilities.getType() == ValueType.STRING) {
+ // string columns must use the on heap algorithm unless they have the following capabilites
+ return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue());
+ } else {
+ // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
+ return true;
+ }
+ }
+
public static boolean canApplyExtractionInPost(TopNQuery query)
{
return query.getDimensionSpec() != null
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index cea7c77..92b2e8a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -59,7 +59,7 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
- // This method is used for the DimExtractionTopNAlgorithm only.
+ // This method is used for the HeapBasedTopNAlgorithm only.
// Unlike regular topN we cannot rely on ordering to optimize.
// Optimization possibly requires a reverse lookup from value to ID, which is
// not possible when applying an extraction function
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 445d8d6..8a468d2 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -25,9 +25,12 @@ import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -41,6 +44,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
@@ -61,7 +65,10 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.LookupJoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
@@ -273,4 +280,26 @@ public class QueryStackTests
return conglomerate;
}
+ public static JoinableFactory makeJoinableFactoryForLookup(
+ LookupExtractorFactoryContainerProvider lookupProvider
+ )
+ {
+ return makeJoinableFactoryFromDefault(lookupProvider, null);
+ }
+
+ public static JoinableFactory makeJoinableFactoryFromDefault(
+ @Nullable LookupExtractorFactoryContainerProvider lookupProvider,
+ @Nullable Map<Class<? extends DataSource>, JoinableFactory> custom
+ )
+ {
+ ImmutableMap.Builder<Class<? extends DataSource>, JoinableFactory> builder = ImmutableMap.builder();
+ builder.put(InlineDataSource.class, new InlineJoinableFactory());
+ if (lookupProvider != null) {
+ builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider));
+ }
+ if (custom != null) {
+ builder.putAll(custom);
+ }
+ return MapJoinableFactoryTest.fromMap(builder.build());
+ }
}
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 6b836b8..3f43f76 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -386,6 +386,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
ImmutableList.of(
row(
Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
+ ),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
@@ -443,6 +449,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
ImmutableList.of(
row(
Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
+ ),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 952b3cf..7c475e5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -422,7 +422,10 @@ public class BaseCalciteQueryTest extends CalciteTestBase
@Before
public void setUp() throws Exception
{
- walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
+ walker = CalciteTests.createMockWalker(
+ conglomerate,
+ temporaryFolder.newFolder()
+ );
}
@After
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 1606ef7..622842f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
@@ -712,6 +713,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
ImmutableList.of(),
ImmutableList.<Object[]>builder()
+ .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
@@ -741,6 +743,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.<Object[]>builder()
+ .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
@@ -14997,6 +15000,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
+ @Test
+ @Parameters(source = QueryContextForJoinProvider.class)
+ public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map<String, Object> queryContext) throws Exception
+ {
+ testQuery(
+ "SELECT druid.broadcast.dim4, COUNT(*)\n"
+ + "FROM druid.numfoo\n"
+ + "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
+ + "GROUP BY 1 ORDER BY 2 LIMIT 4",
+ queryContext,
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE3),
+ new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
+ "j0.",
+ equalsCondition(
+ DruidExpression.fromColumn("dim4"),
+ DruidExpression.fromColumn("j0.dim4")
+ ),
+ JoinType.INNER
+
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
+ .threshold(4)
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
+ .context(queryContext)
+ .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"a", 9L},
+ new Object[]{"b", 9L}
+ )
+ );
+ }
+
/**
* This is a provider of query contexts that should be used by join tests.
* It tests various configs that can be passed to join queries. All the configs provided by this provider should
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index ca683a2..dfee466 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -50,13 +50,17 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.GlobalTableDataSource;
+import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
@@ -72,8 +76,14 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.join.MapJoinableFactory;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.Joinable;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.table.IndexedTableJoinable;
+import org.apache.druid.segment.join.table.RowBasedIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
@@ -125,6 +135,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BooleanSupplier;
@@ -140,6 +151,7 @@ public class CalciteTests
public static final String DATASOURCE3 = "numfoo";
public static final String DATASOURCE4 = "foo4";
public static final String DATASOURCE5 = "lotsocolumns";
+ public static final String BROADCAST_DATASOURCE = "broadcast";
public static final String FORBIDDEN_DATASOURCE = "forbiddenDatasource";
public static final String SOME_DATASOURCE = "some_datasource";
public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource";
@@ -214,7 +226,7 @@ public class CalciteTests
private static final String TIMESTAMP_COLUMN = "t";
- private static final Injector INJECTOR = Guice.createInjector(
+ public static final Injector INJECTOR = Guice.createInjector(
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
@@ -605,6 +617,68 @@ public class CalciteTests
)
);
+ private static final InlineDataSource JOINABLE_BACKING_DATA = InlineDataSource.fromIterable(
+ RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{
+ x.get("dim1"),
+ x.get("dim2"),
+ x.get("dim3"),
+ x.get("dim4"),
+ x.get("dim5"),
+ x.get("d1"),
+ x.get("d2"),
+ x.get("f1"),
+ x.get("f2"),
+ x.get("l1"),
+ x.get("l2")
+ }).collect(Collectors.toList()),
+ RowSignature.builder()
+ .add("dim1", ValueType.STRING)
+ .add("dim2", ValueType.STRING)
+ .add("dim3", ValueType.STRING)
+ .add("dim4", ValueType.STRING)
+ .add("dim5", ValueType.STRING)
+ .add("d1", ValueType.DOUBLE)
+ .add("d2", ValueType.DOUBLE)
+ .add("f1", ValueType.FLOAT)
+ .add("f2", ValueType.FLOAT)
+ .add("l1", ValueType.LONG)
+ .add("l2", ValueType.LONG)
+ .build()
+ );
+
+ private static final Set<String> KEY_COLUMNS = ImmutableSet.of("dim4");
+
+ private static final RowBasedIndexedTable JOINABLE_TABLE = new RowBasedIndexedTable(
+ JOINABLE_BACKING_DATA.getRowsAsList(),
+ JOINABLE_BACKING_DATA.rowAdapter(),
+ JOINABLE_BACKING_DATA.getRowSignature(),
+ KEY_COLUMNS,
+ DateTimes.nowUtc().toString()
+ );
+
+ public static GlobalTableDataSource CUSTOM_TABLE = new GlobalTableDataSource(BROADCAST_DATASOURCE);
+
+ public static JoinableFactory CUSTOM_ROW_TABLE_JOINABLE = new JoinableFactory()
+ {
+ @Override
+ public boolean isDirectlyJoinable(DataSource dataSource)
+ {
+ return CUSTOM_TABLE.equals(dataSource);
+ }
+
+ @Override
+ public Optional<Joinable> build(
+ DataSource dataSource,
+ JoinConditionAnalysis condition
+ )
+ {
+ if (dataSource instanceof GlobalTableDataSource) {
+ return Optional.of(new IndexedTableJoinable(JOINABLE_TABLE));
+ }
+ return Optional.empty();
+ }
+ };
+
private CalciteTests()
{
// No instantiation.
@@ -649,12 +723,28 @@ public class CalciteTests
return INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class));
}
+ public static JoinableFactory createDefaultJoinableFactory()
+ {
+ return QueryStackTests.makeJoinableFactoryFromDefault(
+ INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
+ ImmutableMap.of(
+ GlobalTableDataSource.class,
+ CUSTOM_ROW_TABLE_JOINABLE
+ )
+ );
+ }
+
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir
)
{
- return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
+ return createMockWalker(
+ conglomerate,
+ tmpDir,
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER,
+ createDefaultJoinableFactory()
+ );
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
@@ -663,6 +753,16 @@ public class CalciteTests
final QueryScheduler scheduler
)
{
+ return createMockWalker(conglomerate, tmpDir, scheduler, null);
+ }
+
+ public static SpecificSegmentsQuerySegmentWalker createMockWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final File tmpDir,
+ final QueryScheduler scheduler,
+ final JoinableFactory joinableFactory
+ )
+ {
final QueryableIndex index1 = IndexBuilder
.create()
.tmpDir(new File(tmpDir, "1"))
@@ -713,7 +813,7 @@ public class CalciteTests
final QueryableIndex someDatasourceIndex = IndexBuilder
.create()
- .tmpDir(new File(tmpDir, "1"))
+ .tmpDir(new File(tmpDir, "6"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(INDEX_SCHEMA)
.rows(ROWS1)
@@ -721,7 +821,7 @@ public class CalciteTests
final QueryableIndex someXDatasourceIndex = IndexBuilder
.create()
- .tmpDir(new File(tmpDir, "1"))
+ .tmpDir(new File(tmpDir, "7"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(INDEX_SCHEMA_WITH_X_COLUMNS)
.rows(RAW_ROWS1_X)
@@ -731,7 +831,7 @@ public class CalciteTests
return new SpecificSegmentsQuerySegmentWalker(
conglomerate,
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
- null,
+ joinableFactory,
scheduler
).add(
DataSegment.builder()
@@ -805,6 +905,15 @@ public class CalciteTests
.size(0)
.build(),
someXDatasourceIndex
+ ).add(
+ DataSegment.builder()
+ .dataSource(BROADCAST_DATASOURCE)
+ .interval(indexNumericDims.getDataInterval())
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build(),
+ indexNumericDims
);
}
@@ -979,8 +1088,15 @@ public class CalciteTests
final DruidSchema schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
- new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
- new MapJoinableFactory(ImmutableMap.of()),
+ new SegmentManager(EasyMock.createMock(SegmentLoader.class))
+ {
+ @Override
+ public Set<String> getDataSourceNames()
+ {
+ return ImmutableSet.of(BROADCAST_DATASOURCE);
+ }
+ },
+ createDefaultJoinableFactory(),
plannerConfig,
viewManager,
TEST_AUTHENTICATOR_ESCALATOR
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 0490420..7fde642 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -40,10 +40,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
-import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.LookupJoinableFactory;
-import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
@@ -91,12 +88,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final JoinableFactory joinableFactoryToUse;
if (joinableFactory == null) {
- joinableFactoryToUse = MapJoinableFactoryTest.fromMap(
- ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
- .put(InlineDataSource.class, new InlineJoinableFactory())
- .put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
- .build()
- );
+ joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(lookupProvider);
} else {
joinableFactoryToUse = joinableFactory;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org