You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/02/01 07:51:29 UTC
[druid] branch master updated: SQL join support for lookups. (#9294)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b411443 SQL join support for lookups. (#9294)
b411443 is described below
commit b411443d228c14de746311e09db28b437c2746a5
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Jan 31 23:51:16 2020 -0800
SQL join support for lookups. (#9294)
* SQL join support for lookups.
1) Add LookupSchema to SQL, so lookups show up in the catalog.
2) Add join-related rels and rules to SQL, allowing joins to be planned into
native Druid queries.
* Add two missing LookupSchema calls in tests.
* Fix tests.
* Fix typo.
---
.../apache/druid/benchmark/query/SqlBenchmark.java | 1 +
.../benchmark/query/SqlVsNativeBenchmark.java | 1 +
.../sql/TDigestSketchSqlAggregatorTest.java | 1 +
.../hll/sql/HllSketchSqlAggregatorTest.java | 1 +
.../sql/DoublesSketchSqlAggregatorTest.java | 1 +
.../theta/sql/ThetaSketchSqlAggregatorTest.java | 1 +
.../bloom/sql/BloomFilterSqlAggregatorTest.java | 1 +
...dBucketsHistogramQuantileSqlAggregatorTest.java | 1 +
.../histogram/sql/QuantileSqlAggregatorTest.java | 1 +
.../variance/sql/VarianceSqlAggregatorTest.java | 1 +
.../druid/segment/join/MapJoinableFactory.java | 2 +-
.../druid/server/ClientQuerySegmentWalker.java | 29 +-
.../apache/druid/sql/calcite/planner/Calcites.java | 31 +-
.../druid/sql/calcite/planner/PlannerFactory.java | 5 +
.../apache/druid/sql/calcite/planner/Rules.java | 6 +-
.../druid/sql/calcite/rel/DruidJoinQueryRel.java | 344 ++++++++++++++
.../druid/sql/calcite/rel/DruidOuterQueryRel.java | 2 +-
.../apache/druid/sql/calcite/rel/DruidQuery.java | 126 +++--
.../druid/sql/calcite/rel/DruidQueryRel.java | 2 +-
.../apache/druid/sql/calcite/rel/DruidRels.java | 76 +++
.../druid/sql/calcite/rel/PartialDruidQuery.java | 11 +-
.../apache/druid/sql/calcite/rel/Projection.java | 2 +-
.../sql/calcite/rel/VirtualColumnRegistry.java | 2 +-
.../druid/sql/calcite/rule/DruidJoinRule.java | 163 +++++++
.../apache/druid/sql/calcite/rule/DruidRules.java | 3 +-
.../druid/sql/calcite/schema/LookupSchema.java | 67 +++
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 2 +
.../druid/sql/avatica/DruidStatementTest.java | 1 +
.../druid/sql/calcite/BaseCalciteQueryTest.java | 67 ++-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 526 +++++++++++++++++++--
.../druid/sql/calcite/http/SqlResourceTest.java | 1 +
.../druid/sql/calcite/planner/CalcitesTest.java | 18 +-
.../druid/sql/calcite/rel/DruidRelsTest.java | 241 ++++++++++
.../druid/sql/calcite/rule/DruidJoinRuleTest.java | 203 ++++++++
.../druid/sql/calcite/util/CalciteTests.java | 10 +-
.../util/SpecificSegmentsQuerySegmentWalker.java | 419 +++++++++++-----
36 files changed, 2112 insertions(+), 257 deletions(-)
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index f51684a..0a4d2a2 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -198,6 +198,7 @@ public class SqlBenchmark
plannerFactory = new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate.lhs),
CalciteTests.createOperatorTable(),
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
index 043405d..74f5b51 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
@@ -121,6 +121,7 @@ public class SqlVsNativeBenchmark
plannerFactory = new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java
index 8056a10..0eb4571 100644
--- a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java
+++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java
@@ -160,6 +160,7 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 77bf27c..d5c1cda 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -188,6 +188,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index 47711ef..6d6c5ed 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -189,6 +189,7 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
index f4ef3b7..9ff3a60 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
@@ -186,6 +186,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
index 9ca5885..92dba66 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -202,6 +202,7 @@ public class BloomFilterSqlAggregatorTest extends InitializedNullHandlingTest
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
index bb9d3d0..3d00aa8 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
@@ -172,6 +172,7 @@ public class FixedBucketsHistogramQuantileSqlAggregatorTest extends CalciteTestB
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index be90e8f..6b0e308 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -171,6 +171,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java b/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java
index a061f47..0247c88 100644
--- a/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java
+++ b/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java
@@ -174,6 +174,7 @@ public class VarianceSqlAggregatorTest extends InitializedNullHandlingTest
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
index 5575348..beb8106 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
@@ -36,7 +36,7 @@ public class MapJoinableFactory implements JoinableFactory
private final Map<Class<? extends DataSource>, JoinableFactory> joinableFactories;
@Inject
- MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
+ public MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
{
// Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
// Class doesn't override Object.equals().
diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index b484486..27bccaf 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -48,7 +48,7 @@ import org.joda.time.Interval;
public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
private final ServiceEmitter emitter;
- private final CachingClusteredClient baseClient;
+ private final QuerySegmentWalker baseClient;
private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
@@ -56,10 +56,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
- @Inject
public ClientQuerySegmentWalker(
ServiceEmitter emitter,
- CachingClusteredClient baseClient,
+ QuerySegmentWalker baseClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
@@ -78,6 +77,30 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
this.cacheConfig = cacheConfig;
}
+ @Inject
+ ClientQuerySegmentWalker(
+ ServiceEmitter emitter,
+ CachingClusteredClient baseClient,
+ QueryToolChestWarehouse warehouse,
+ RetryQueryRunnerConfig retryConfig,
+ ObjectMapper objectMapper,
+ ServerConfig serverConfig,
+ Cache cache,
+ CacheConfig cacheConfig
+ )
+ {
+ this(
+ emitter,
+ (QuerySegmentWalker) baseClient,
+ warehouse,
+ retryConfig,
+ objectMapper,
+ serverConfig,
+ cache,
+ cacheConfig
+ );
+ }
+
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
index 81f45f2..9eab11a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import org.apache.calcite.jdbc.CalciteSchema;
@@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.InformationSchema;
+import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -55,8 +57,10 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.ISODateTimeFormat;
+import javax.annotation.Nullable;
import java.nio.charset.Charset;
import java.util.NavigableSet;
+import java.util.TreeSet;
import java.util.regex.Pattern;
/**
@@ -104,12 +108,14 @@ public class Calcites
public static SchemaPlus createRootSchema(
final DruidSchema druidSchema,
+ final LookupSchema lookupSchema,
final SystemSchema systemSchema,
final AuthorizerMapper authorizerMapper
)
{
final SchemaPlus rootSchema = CalciteSchema.createRootSchema(false, false).plus();
rootSchema.add(DruidSchema.NAME, druidSchema);
+ rootSchema.add(LookupSchema.NAME, lookupSchema);
rootSchema.add(InformationSchema.NAME, new InformationSchema(rootSchema, authorizerMapper));
rootSchema.add(SystemSchema.NAME, systemSchema);
return rootSchema;
@@ -137,6 +143,7 @@ public class Calcites
}
+ @Nullable
public static ValueType getValueTypeForSqlTypeName(SqlTypeName sqlTypeName)
{
if (SqlTypeName.FLOAT == sqlTypeName) {
@@ -345,23 +352,23 @@ public class Calcites
}
/**
- * Checks if a RexNode is a literal int or not. If this returns true, then {@code RexLiteral.intValue(literal)} can be
- * used to get the value of the literal.
- *
- * @param rexNode the node
- *
- * @return true if this is an int
+ * Find a string that is either equal to "basePrefix", or basePrefix prepended by underscores, and where nothing in
+ * "strings" starts with prefix plus a digit.
*/
- public static boolean isIntLiteral(final RexNode rexNode)
+ public static String findUnusedPrefixForDigits(final String basePrefix, final Iterable<String> strings)
{
- return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
- }
+ final NavigableSet<String> navigableStrings;
+
+ if (strings instanceof NavigableSet) {
+ navigableStrings = (NavigableSet<String>) strings;
+ } else {
+ navigableStrings = new TreeSet<>();
+ Iterables.addAll(navigableStrings, strings);
+ }
- public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
- {
String prefix = basePrefix;
- while (!isUnusedPrefix(prefix, strings)) {
+ while (!isUnusedPrefix(prefix, navigableStrings)) {
prefix = "_" + prefix;
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
index 3d5f2fa..135a621 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
@@ -42,6 +42,7 @@ import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.calcite.rel.QueryMaker;
import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import java.util.Map;
@@ -59,6 +60,7 @@ public class PlannerFactory
.build();
private final DruidSchema druidSchema;
+ private final LookupSchema lookupSchema;
private final SystemSchema systemSchema;
private final QueryLifecycleFactory queryLifecycleFactory;
private final DruidOperatorTable operatorTable;
@@ -70,6 +72,7 @@ public class PlannerFactory
@Inject
public PlannerFactory(
final DruidSchema druidSchema,
+ final LookupSchema lookupSchema,
final SystemSchema systemSchema,
final QueryLifecycleFactory queryLifecycleFactory,
final DruidOperatorTable operatorTable,
@@ -80,6 +83,7 @@ public class PlannerFactory
)
{
this.druidSchema = druidSchema;
+ this.lookupSchema = lookupSchema;
this.systemSchema = systemSchema;
this.queryLifecycleFactory = queryLifecycleFactory;
this.operatorTable = operatorTable;
@@ -96,6 +100,7 @@ public class PlannerFactory
{
final SchemaPlus rootSchema = Calcites.createRootSchema(
druidSchema,
+ lookupSchema,
systemSchema,
authorizerMapper
);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
index 9463ab5..b2b3659 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
@@ -101,7 +101,7 @@ public class Rules
JoinPushExpressionsRule.INSTANCE,
FilterAggregateTransposeRule.INSTANCE,
ProjectWindowTransposeRule.INSTANCE,
- JoinCommuteRule.INSTANCE,
+ JoinCommuteRule.SWAP_OUTER,
JoinPushThroughJoinRule.RIGHT,
JoinPushThroughJoinRule.LEFT,
SortProjectTransposeRule.INSTANCE,
@@ -130,13 +130,13 @@ public class Rules
AggregateValuesRule.INSTANCE
);
- // Rules from VolcanoPlanner's registerAbstractRelationalRules.
+ // Rules from VolcanoPlanner's registerAbstractRelationalRules, minus JoinCommuteRule since it's already
+ // in DEFAULT_RULES.
private static final List<RelOptRule> VOLCANO_ABSTRACT_RULES =
ImmutableList.of(
FilterJoinRule.FILTER_ON_JOIN,
FilterJoinRule.JOIN,
AbstractConverter.ExpandConversionRule.INSTANCE,
- JoinCommuteRule.INSTANCE,
AggregateRemoveRule.INSTANCE,
UnionToDistinctRule.INSTANCE,
ProjectRemoveRule.INSTANCE,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
new file mode 100644
index 0000000..1142188
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * DruidRel that uses a {@link JoinDataSource}.
+ */
+public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
+{
+ private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__");
+ private static final double COST_FACTOR = 100.0;
+
+ private final PartialDruidQuery partialQuery;
+ private final Join joinRel;
+ private RelNode left;
+ private RelNode right;
+
+ private DruidJoinQueryRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ Join joinRel,
+ PartialDruidQuery partialQuery,
+ QueryMaker queryMaker
+ )
+ {
+ super(cluster, traitSet, queryMaker);
+ this.joinRel = joinRel;
+ this.left = joinRel.getLeft();
+ this.right = joinRel.getRight();
+ this.partialQuery = partialQuery;
+ }
+
+ public static DruidJoinQueryRel create(final Join joinRel, final QueryMaker queryMaker)
+ {
+ return new DruidJoinQueryRel(
+ joinRel.getCluster(),
+ joinRel.getTraitSet(),
+ joinRel,
+ PartialDruidQuery.create(joinRel),
+ queryMaker
+ );
+ }
+
+ @Override
+ public PartialDruidQuery getPartialDruidQuery()
+ {
+ return partialQuery;
+ }
+
+ @Override
+ public Sequence<Object[]> runQuery()
+ {
+ // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+ // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+ // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+ final DruidQuery query = toDruidQuery(false);
+ return getQueryMaker().runQuery(query);
+ }
+
+ @Override
+ public DruidJoinQueryRel withPartialQuery(final PartialDruidQuery newQueryBuilder)
+ {
+ return new DruidJoinQueryRel(
+ getCluster(),
+ getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
+ joinRel,
+ newQueryBuilder,
+ getQueryMaker()
+ );
+ }
+
+ @Override
+ public int getQueryCount()
+ {
+ return ((DruidRel<?>) left).getQueryCount() + ((DruidRel<?>) right).getQueryCount();
+ }
+
+ @Override
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
+ {
+ final DruidRel<?> leftDruidRel = (DruidRel<?>) left;
+ final DruidQuery leftQuery = Preconditions.checkNotNull((leftDruidRel).toDruidQuery(false), "leftQuery");
+ final RowSignature leftSignature = leftQuery.getOutputRowSignature();
+ final DataSource leftDataSource;
+
+ final DruidRel<?> rightDruidRel = (DruidRel<?>) right;
+ final DruidQuery rightQuery = Preconditions.checkNotNull(rightDruidRel.toDruidQuery(false), "rightQuery");
+ final RowSignature rightSignature = rightQuery.getOutputRowSignature();
+ final DataSource rightDataSource;
+
+ // Left rel: allow direct embedding of scans/mappings including those of joins.
+ if (DruidRels.isScanOrMapping(leftDruidRel, true)) {
+ leftDataSource = leftQuery.getDataSource();
+ } else {
+ leftDataSource = new QueryDataSource(leftQuery.getQuery());
+ }
+
+ // Right rel: allow direct embedding of scans/mappings, excluding joins (those must be done as subqueries).
+ if (DruidRels.isScanOrMapping(rightDruidRel, false)) {
+ rightDataSource = rightQuery.getDataSource();
+ } else {
+ rightDataSource = new QueryDataSource(rightQuery.getQuery());
+ }
+
+ final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(leftSignature, rightSignature);
+
+ // Generate the condition for this join as a Druid expression.
+ final DruidExpression condition = Expressions.toDruidExpression(
+ getPlannerContext(),
+ prefixSignaturePair.rhs,
+ joinRel.getCondition()
+ );
+
+ // DruidJoinRule should not have created us if "condition" is null. Check defensively anyway, which also
+ // quiets static code analysis.
+ if (condition == null) {
+ throw new CannotBuildQueryException(joinRel, joinRel.getCondition());
+ }
+
+ return partialQuery.build(
+ JoinDataSource.create(
+ leftDataSource,
+ rightDataSource,
+ prefixSignaturePair.lhs,
+ condition.getExpression(),
+ toDruidJoinType(joinRel.getJoinType()),
+ getPlannerContext().getExprMacroTable()
+ ),
+ prefixSignaturePair.rhs,
+ getPlannerContext(),
+ getCluster().getRexBuilder(),
+ finalizeAggregations
+ );
+ }
+
+ @Override
+ public DruidQuery toDruidQueryForExplaining()
+ {
+ return partialQuery.build(
+ DUMMY_DATA_SOURCE,
+ RowSignature.from(
+ joinRel.getRowType().getFieldNames(),
+ joinRel.getRowType()
+ ),
+ getPlannerContext(),
+ getCluster().getRexBuilder(),
+ false
+ );
+ }
+
+ @Override
+ public DruidJoinQueryRel asDruidConvention()
+ {
+ return new DruidJoinQueryRel(
+ getCluster(),
+ getTraitSet().replace(DruidConvention.instance()),
+ joinRel.copy(
+ joinRel.getTraitSet(),
+ joinRel.getInputs()
+ .stream()
+ .map(input -> RelOptRule.convert(input, DruidConvention.instance()))
+ .collect(Collectors.toList())
+ ),
+ partialQuery,
+ getQueryMaker()
+ );
+ }
+
+ @Override
+ public List<RelNode> getInputs()
+ {
+ return ImmutableList.of(left, right);
+ }
+
+ @Override
+ public void replaceInput(int ordinalInParent, RelNode p)
+ {
+ joinRel.replaceInput(ordinalInParent, p);
+
+ if (ordinalInParent == 0) {
+ this.left = p;
+ } else if (ordinalInParent == 1) {
+ this.right = p;
+ } else {
+ throw new IndexOutOfBoundsException(StringUtils.format("Invalid ordinalInParent[%s]", ordinalInParent));
+ }
+ }
+
+ @Override
+ public List<RexNode> getChildExps()
+ {
+ return ImmutableList.of(joinRel.getCondition());
+ }
+
+ @Override
+ public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs)
+ {
+ return new DruidJoinQueryRel(
+ getCluster(),
+ traitSet,
+ joinRel.copy(joinRel.getTraitSet(), inputs),
+ getPartialDruidQuery(),
+ getQueryMaker()
+ );
+ }
+
+ @Override
+ public Set<String> getDataSourceNames()
+ {
+ final Set<String> retVal = new HashSet<>();
+ retVal.addAll(((DruidRel<?>) left).getDataSourceNames());
+ retVal.addAll(((DruidRel<?>) right).getDataSourceNames());
+ return retVal;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw)
+ {
+ final String queryString;
+ final DruidQuery druidQuery = toDruidQueryForExplaining();
+
+ try {
+ queryString = getQueryMaker().getJsonMapper().writeValueAsString(druidQuery.getQuery());
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ return pw.input("left", left)
+ .input("right", right)
+ .item("condition", joinRel.getCondition())
+ .item("joinType", joinRel.getJoinType())
+ .item("query", queryString)
+ .item("signature", druidQuery.getOutputRowSignature());
+ }
+
+ @Override
+ protected RelDataType deriveRowType()
+ {
+ return partialQuery.getRowType();
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
+ {
+ return planner.getCostFactory()
+ .makeCost(mq.getRowCount(left), 0, 0)
+ .plus(planner.getCostFactory().makeCost(mq.getRowCount(right), 0, 0))
+ .multiplyBy(COST_FACTOR);
+ }
+
+ private static JoinType toDruidJoinType(JoinRelType calciteJoinType)
+ {
+ switch (calciteJoinType) {
+ case LEFT:
+ return JoinType.LEFT;
+ case RIGHT:
+ return JoinType.RIGHT;
+ case FULL:
+ return JoinType.FULL;
+ case INNER:
+ return JoinType.INNER;
+ default:
+ throw new IAE("Cannot handle joinType[%s]", calciteJoinType);
+ }
+ }
+
+ /**
+ * Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
+ * applying that prefix.
+ */
+ private static Pair<String, RowSignature> computeJoinRowSignature(
+ final RowSignature leftSignature,
+ final RowSignature rightSignature
+ )
+ {
+ final RowSignature.Builder signatureBuilder = RowSignature.builder();
+
+ for (final String column : leftSignature.getRowOrder()) {
+ signatureBuilder.add(column, leftSignature.getColumnType(column));
+ }
+
+ // Need to include the "0" since findUnusedPrefixForDigits only guarantees safety for digit-initiated suffixes
+ final String rightPrefix = Calcites.findUnusedPrefixForDigits("j", leftSignature.getRowOrder()) + "0.";
+
+ for (final String column : rightSignature.getRowOrder()) {
+ signatureBuilder.add(rightPrefix + column, rightSignature.getColumnType(column));
+ }
+
+ return Pair.of(rightPrefix, signatureBuilder.build());
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
index 934361a..d80bca6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -44,7 +44,7 @@ import java.util.List;
import java.util.Set;
/**
- * DruidRel that uses a "query" dataSource.
+ * DruidRel that uses a {@link QueryDataSource}.
*/
public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index b976960..92392cf 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -117,7 +117,32 @@ public class DruidQuery
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;
- public DruidQuery(
+ private DruidQuery(
+ final DataSource dataSource,
+ final PlannerContext plannerContext,
+ @Nullable final DimFilter filter,
+ @Nullable final Projection selectProjection,
+ @Nullable final Grouping grouping,
+ @Nullable final Sorting sorting,
+ final RowSignature sourceRowSignature,
+ final RelDataType outputRowType,
+ final VirtualColumnRegistry virtualColumnRegistry
+ )
+ {
+ this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+ this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext");
+ this.filter = filter;
+ this.selectProjection = selectProjection;
+ this.grouping = grouping;
+ this.sorting = sorting;
+ this.sourceRowSignature = Preconditions.checkNotNull(sourceRowSignature, "sourceRowSignature");
+ this.outputRowSignature = computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, sorting);
+ this.outputRowType = Preconditions.checkNotNull(outputRowType, "outputRowType");
+ this.virtualColumnRegistry = Preconditions.checkNotNull(virtualColumnRegistry, "virtualColumnRegistry");
+ this.query = computeQuery();
+ }
+
+ public static DruidQuery fromPartialQuery(
final PartialDruidQuery partialQuery,
final DataSource dataSource,
final RowSignature sourceRowSignature,
@@ -126,15 +151,17 @@ public class DruidQuery
final boolean finalizeAggregations
)
{
- this.dataSource = dataSource;
- this.outputRowType = partialQuery.leafRel().getRowType();
- this.sourceRowSignature = sourceRowSignature;
- this.virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
- this.plannerContext = plannerContext;
+ final RelDataType outputRowType = partialQuery.leafRel().getRowType();
+ final VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
// Now the fun begins.
+ final DimFilter filter;
+ final Projection selectProjection;
+ final Grouping grouping;
+ final Sorting sorting;
+
if (partialQuery.getWhereFilter() != null) {
- this.filter = Preconditions.checkNotNull(
+ filter = Preconditions.checkNotNull(
computeWhereFilter(
partialQuery,
plannerContext,
@@ -143,55 +170,64 @@ public class DruidQuery
)
);
} else {
- this.filter = null;
+ filter = null;
}
// Only compute "selectProjection" if this is a non-aggregating query. (For aggregating queries, "grouping" will
// reflect select-project from partialQuery on its own.)
if (partialQuery.getSelectProject() != null && partialQuery.getAggregate() == null) {
- this.selectProjection = Preconditions.checkNotNull(
+ selectProjection = Preconditions.checkNotNull(
computeSelectProjection(
partialQuery,
plannerContext,
- computeOutputRowSignature(),
+ computeOutputRowSignature(sourceRowSignature, null, null, null),
virtualColumnRegistry
)
);
} else {
- this.selectProjection = null;
+ selectProjection = null;
}
if (partialQuery.getAggregate() != null) {
- this.grouping = Preconditions.checkNotNull(
+ grouping = Preconditions.checkNotNull(
computeGrouping(
partialQuery,
plannerContext,
- computeOutputRowSignature(),
+ computeOutputRowSignature(sourceRowSignature, selectProjection, null, null),
virtualColumnRegistry,
rexBuilder,
finalizeAggregations
)
);
} else {
- this.grouping = null;
+ grouping = null;
}
if (partialQuery.getSort() != null) {
- this.sorting = Preconditions.checkNotNull(
+ sorting = Preconditions.checkNotNull(
computeSorting(
partialQuery,
plannerContext,
- computeOutputRowSignature(),
+ computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, null),
// When sorting follows grouping, virtual columns cannot be used
partialQuery.getAggregate() != null ? null : virtualColumnRegistry
)
);
} else {
- this.sorting = null;
+ sorting = null;
}
- this.outputRowSignature = computeOutputRowSignature();
- this.query = computeQuery();
+ return new DruidQuery(
+ dataSource,
+ plannerContext,
+ filter,
+ selectProjection,
+ grouping,
+ sorting,
+ sourceRowSignature,
+ outputRowType,
+ virtualColumnRegistry
+ );
}
@Nonnull
@@ -357,7 +393,7 @@ public class DruidQuery
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<DimensionExpression> dimensions = new ArrayList<>();
- final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(rowSignature.getRowOrder()));
+ final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("d", new TreeSet<>(rowSignature.getRowOrder()));
int outputNameCounter = 0;
for (int i : aggregate.getGroupSet()) {
@@ -426,7 +462,7 @@ public class DruidQuery
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<Aggregation> aggregations = new ArrayList<>();
- final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(rowSignature.getRowOrder()));
+ final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("a", new TreeSet<>(rowSignature.getRowOrder()));
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final String aggName = outputNamePrefix + i;
@@ -525,6 +561,29 @@ public class DruidQuery
return Sorting.create(orderBys, limit, projection);
}
+ /**
+ * Return the {@link RowSignature} corresponding to the output of a query with the given parameters.
+ */
+ private static RowSignature computeOutputRowSignature(
+ final RowSignature sourceRowSignature,
+ @Nullable final Projection selectProjection,
+ @Nullable final Grouping grouping,
+ @Nullable final Sorting sorting
+ )
+ {
+ if (sorting != null && sorting.getProjection() != null) {
+ return sorting.getProjection().getOutputRowSignature();
+ } else if (grouping != null) {
+ // Sanity check: cannot have both "grouping" and "selectProjection".
+ Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
+ return grouping.getOutputRowSignature();
+ } else if (selectProjection != null) {
+ return selectProjection.getOutputRowSignature();
+ } else {
+ return sourceRowSignature;
+ }
+ }
+
private VirtualColumns getVirtualColumns(final boolean includeDimensions)
{
// 'sourceRowSignature' could provide a list of all defined virtual columns while constructing a query, but we
@@ -570,6 +629,11 @@ public class DruidQuery
return VirtualColumns.create(columns);
}
+ public DataSource getDataSource()
+ {
+ return dataSource;
+ }
+
@Nullable
public Grouping getGrouping()
{
@@ -592,26 +656,6 @@ public class DruidQuery
}
/**
- * Return the {@link RowSignature} corresponding to the output of this query. This method may be called during
- * construction, in which case it returns the output row signature at whatever phase of construction this method
- * is called at. At the end of construction, the final result is assigned to {@link #outputRowSignature}.
- */
- private RowSignature computeOutputRowSignature()
- {
- if (sorting != null && sorting.getProjection() != null) {
- return sorting.getProjection().getOutputRowSignature();
- } else if (grouping != null) {
- // Sanity check: cannot have both "grouping" and "selectProjection".
- Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
- return grouping.getOutputRowSignature();
- } else if (selectProjection != null) {
- return selectProjection.getOutputRowSignature();
- } else {
- return sourceRowSignature;
- }
- }
-
- /**
* Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery},
* {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery}
*
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
index fed6d88..a6fa53c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
@@ -38,7 +38,7 @@ import javax.annotation.Nonnull;
import java.util.Set;
/**
- * DruidRel that uses a "table" dataSource.
+ * DruidRel that operates on top of a {@link DruidTable} directly (no joining or subqueries).
*/
public class DruidQueryRel extends DruidRel<DruidQueryRel>
{
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java
new file mode 100644
index 0000000..2ca30d8
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import org.apache.druid.query.DataSource;
+import org.apache.druid.sql.calcite.table.DruidTable;
+
+import java.util.Optional;
+
+public class DruidRels
+{
+ /**
+ * Returns the DataSource involved in a leaf query of class {@link DruidQueryRel}.
+ */
+ public static Optional<DataSource> dataSourceIfLeafRel(final DruidRel<?> druidRel)
+ {
+ if (druidRel instanceof DruidQueryRel) {
+ return Optional.of(druidRel.getTable().unwrap(DruidTable.class).getDataSource());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Check if a druidRel is a simple table scan, or a projection that merely remaps columns without transforming them.
+ * Like {@link #isScanOrProject} but more restrictive: only remappings are allowed.
+ *
+ * @param druidRel the rel to check
+ * @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
+ */
+ public static boolean isScanOrMapping(final DruidRel<?> druidRel, final boolean canBeJoin)
+ {
+ if (isScanOrProject(druidRel, canBeJoin)) {
+ // Like isScanOrProject, but don't allow transforming projections.
+ final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
+ return partialQuery.getSelectProject() == null || partialQuery.getSelectProject().isMapping();
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check if a druidRel is a simple table scan or a scan + projection.
+ *
+ * @param druidRel the rel to check
+ * @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
+ */
+ private static boolean isScanOrProject(final DruidRel<?> druidRel, final boolean canBeJoin)
+ {
+ if (druidRel instanceof DruidQueryRel || (canBeJoin && druidRel instanceof DruidJoinQueryRel)) {
+ final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
+ final PartialDruidQuery.Stage stage = partialQuery.stage();
+ return (stage == PartialDruidQuery.Stage.SCAN || stage == PartialDruidQuery.Stage.SELECT_PROJECT)
+ && partialQuery.getWhereFilter() == null;
+ } else {
+ return false;
+ }
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
index e9473c6..7738c92 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -103,7 +103,7 @@ public class PartialDruidQuery
{
final Supplier<RelBuilder> builderSupplier = () -> RelFactories.LOGICAL_BUILDER.create(
scanRel.getCluster(),
- scanRel.getTable().getRelOptSchema()
+ scanRel.getTable() != null ? scanRel.getTable().getRelOptSchema() : null
);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null);
}
@@ -303,7 +303,14 @@ public class PartialDruidQuery
final boolean finalizeAggregations
)
{
- return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
+ return DruidQuery.fromPartialQuery(
+ this,
+ dataSource,
+ sourceRowSignature,
+ plannerContext,
+ rexBuilder,
+ finalizeAggregations
+ );
}
public boolean canAccept(final Stage stage)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
index 41b2f41..16b6792 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
@@ -197,7 +197,7 @@ public class Projection
)
{
final List<String> rowOrder = new ArrayList<>();
- final String outputNamePrefix = Calcites.findUnusedPrefix(
+ final String outputNamePrefix = Calcites.findUnusedPrefixForDigits(
basePrefix,
new TreeSet<>(inputRowSignature.getRowOrder())
);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
index dc35391..a4a0d25 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
@@ -60,7 +60,7 @@ public class VirtualColumnRegistry
{
return new VirtualColumnRegistry(
rowSignature,
- Calcites.findUnusedPrefix("v", new TreeSet<>(rowSignature.getRowOrder())),
+ Calcites.findUnusedPrefixForDigits("v", new TreeSet<>(rowSignature.getRowOrder())),
new HashMap<>(),
new HashMap<>()
);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
new file mode 100644
index 0000000..a7ba42e
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rule;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
+import org.apache.druid.sql.calcite.rel.DruidRel;
+import org.apache.druid.sql.calcite.rel.DruidRels;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+public class DruidJoinRule extends RelOptRule
+{
+ private static final DruidJoinRule INSTANCE = new DruidJoinRule();
+
+ private DruidJoinRule()
+ {
+ super(
+ operand(
+ Join.class,
+ operand(DruidRel.class, none()),
+ operand(DruidRel.class, none())
+ )
+ );
+ }
+
+ public static DruidJoinRule instance()
+ {
+ return INSTANCE;
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call)
+ {
+ final Join join = call.rel(0);
+ final DruidRel<?> left = call.rel(1);
+ final DruidRel<?> right = call.rel(2);
+
+ // 1) Condition must be handleable.
+ // 2) Left must be a scan or a join.
+ // 3) If left is not a join, it must be concrete.
+ // 4) Right must be a scan (and *cannot* be a join; we want to generate left-heavy trees).
+ // 5) Right must be global.
+ return
+ canHandleCondition(join.getCondition(), join.getLeft().getRowType())
+ && DruidRels.isScanOrMapping(left, true)
+ && DruidRels.isScanOrMapping(right, false)
+ && (left instanceof DruidJoinQueryRel
+ || DruidRels.dataSourceIfLeafRel(left).filter(DataSource::isConcrete).isPresent())
+ && DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call)
+ {
+ final Join join = call.rel(0);
+ final DruidRel<?> left = call.rel(1);
+
+ // Preconditions were already verified in "matches".
+ call.transformTo(
+ DruidJoinQueryRel.create(join, left.getQueryMaker())
+ );
+ }
+
+ /**
+ * Returns true if this condition is an AND of equality conditions of the form: f(LeftRel) = RightColumn.
+ *
+ * @see org.apache.druid.segment.join.JoinConditionAnalysis where "equiCondition" is the same concept.
+ */
+ @VisibleForTesting
+ static boolean canHandleCondition(final RexNode condition, final RelDataType leftRowType)
+ {
+ final List<RexNode> subConditions = decomposeAnd(condition);
+
+ for (RexNode subCondition : subConditions) {
+ if (subCondition.isA(SqlKind.LITERAL)) {
+ // Literals are always OK.
+ continue;
+ }
+
+ if (!subCondition.isA(SqlKind.EQUALS)) {
+ // If it's not EQUALS, it's not supported.
+ return false;
+ }
+
+ final List<RexNode> operands = ((RexCall) subCondition).getOperands();
+ Preconditions.checkState(operands.size() == 2, "Expected 2 operands, got[%,d]", operands.size());
+
+ final int numLeftFields = leftRowType.getFieldList().size();
+
+ final boolean rhsIsFieldOfRightRel =
+ operands.get(1).isA(SqlKind.INPUT_REF)
+ && ((RexInputRef) operands.get(1)).getIndex() >= numLeftFields;
+
+ final boolean lhsIsExpressionOfLeftRel =
+ RelOptUtil.InputFinder.bits(operands.get(0)).intersects(ImmutableBitSet.range(numLeftFields));
+
+ if (!(lhsIsExpressionOfLeftRel && rhsIsFieldOfRightRel)) {
+ // Cannot handle this condition.
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @VisibleForTesting
+ static List<RexNode> decomposeAnd(final RexNode condition)
+ {
+ final List<RexNode> retVal = new ArrayList<>();
+ final Stack<RexNode> stack = new Stack<>();
+
+ stack.push(condition);
+
+ while (!stack.empty()) {
+ final RexNode current = stack.pop();
+
+ if (current.isA(SqlKind.AND)) {
+ final List<RexNode> operands = ((RexCall) current).getOperands();
+
+ // Add right-to-left, so when we unwind the stack, the operands are in the original order.
+ for (int i = operands.size() - 1; i >= 0; i--) {
+ stack.push(operands.get(i));
+ }
+ } else {
+ retVal.add(current);
+ }
+ }
+
+ return retVal;
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
index 21998f1..75f5038 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
@@ -90,7 +90,8 @@ public class DruidRules
DruidOuterQueryRule.PROJECT_AGGREGATE,
DruidOuterQueryRule.AGGREGATE_SORT_PROJECT,
DruidUnionRule.instance(),
- DruidSortUnionRule.instance()
+ DruidSortUnionRule.instance(),
+ DruidJoinRule.instance()
);
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
new file mode 100644
index 0000000..0fd414b
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory;
+import org.apache.druid.sql.calcite.table.DruidTable;
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import java.util.Map;
+
+/**
+ * Creates the "lookup" schema in Druid SQL, composed of all available {@link LookupDataSource}.
+ */
+public class LookupSchema extends AbstractSchema
+{
+ public static final String NAME = "lookup";
+
+ private static final RowSignature ROW_SIGNATURE =
+ RowSignature.builder()
+ .add(LookupColumnSelectorFactory.KEY_COLUMN, ValueType.STRING)
+ .add(LookupColumnSelectorFactory.VALUE_COLUMN, ValueType.STRING)
+ .build();
+
+ private final LookupExtractorFactoryContainerProvider lookupProvider;
+
+ @Inject
+ public LookupSchema(final LookupExtractorFactoryContainerProvider lookupProvider)
+ {
+ this.lookupProvider = lookupProvider;
+ }
+
+ @Override
+ protected Map<String, Table> getTableMap()
+ {
+ final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
+
+ for (final String lookupName : lookupProvider.getAllLookupNames()) {
+ tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE));
+ }
+
+ return tableMapBuilder.build();
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 5d31868..0aee606 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -187,6 +187,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
testRequestLogger = new TestRequestLogger();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
@@ -827,6 +828,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index 9cc2ecf..aad9e2d 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -93,6 +93,7 @@ public class DruidStatementTest extends CalciteTestBase
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index e82fcae..446f946 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -32,7 +32,9 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
@@ -58,11 +60,13 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -94,10 +98,13 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class BaseCalciteQueryTest extends CalciteTestBase
{
- public static final String NULL_VALUE = NullHandling.replaceWithDefault() ? "" : null;
+ public static final String NULL_STRING = NullHandling.defaultStringValue();
+ public static final Float NULL_FLOAT = NullHandling.defaultFloatValue();
+ public static final Long NULL_LONG = NullHandling.defaultLongValue();
public static final String HLLC_STRING = VersionOneHyperLogLogCollector.class.getName();
public static final Logger log = new Logger(BaseCalciteQueryTest.class);
@@ -329,7 +336,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
false,
true,
null,
- null,
+ null,
StringComparators.NUMERIC
);
}
@@ -363,6 +370,29 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return new ExpressionVirtualColumn(name, expression, outputType, CalciteTests.createExprMacroTable());
}
+ public static JoinDataSource join(
+ DataSource left,
+ DataSource right,
+ String rightPrefix,
+ String condition,
+ JoinType joinType
+ )
+ {
+ return JoinDataSource.create(
+ left,
+ right,
+ rightPrefix,
+ condition,
+ joinType,
+ CalciteTests.createExprMacroTable()
+ );
+ }
+
+ public static String equalsCondition(DruidExpression left, DruidExpression right)
+ {
+ return StringUtils.format("(%s == %s)", left.getExpression(), right.getExpression());
+ }
+
public static ExpressionPostAggregator expressionPostAgg(final String name, final String expression)
{
return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
@@ -371,7 +401,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static Druids.ScanQueryBuilder newScanQueryBuilder()
{
return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
- .legacy(false);
+ .legacy(false);
}
@BeforeClass
@@ -501,17 +531,31 @@ public class BaseCalciteQueryTest extends CalciteTestBase
testQuery(plannerConfig, QUERY_CONTEXT_DEFAULT, sql, authenticationResult, expectedQueries, expectedResults);
}
- private Query recursivelyOverrideContext(final Query q, final Map<String, Object> context)
+ /**
+ * Override not just the outer query context, but also the contexts of all subqueries.
+ */
+ private <T> Query<T> recursivelyOverrideContext(final Query<T> query, final Map<String, Object> context)
{
- final Query q2;
- if (q.getDataSource() instanceof QueryDataSource) {
- final Query subQuery = ((QueryDataSource) q.getDataSource()).getQuery();
- q2 = q.withDataSource(new QueryDataSource(recursivelyOverrideContext(subQuery, context)));
+ return query.withDataSource(recursivelyOverrideContext(query.getDataSource(), context))
+ .withOverriddenContext(context);
+ }
+
+ /**
+ * Override the contexts of all subqueries of a particular datasource.
+ */
+ private DataSource recursivelyOverrideContext(final DataSource dataSource, final Map<String, Object> context)
+ {
+ if (dataSource instanceof QueryDataSource) {
+ final Query subquery = ((QueryDataSource) dataSource).getQuery();
+ return new QueryDataSource(recursivelyOverrideContext(subquery, context));
} else {
- q2 = q;
+ return dataSource.withChildren(
+ dataSource.getChildren()
+ .stream()
+ .map(ds -> recursivelyOverrideContext(ds, context))
+ .collect(Collectors.toList())
+ );
}
-
- return q2.withOverriddenContext(context);
}
public void testQuery(
@@ -595,6 +639,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 74e033d..3301b87 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -32,8 +32,10 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -82,6 +84,7 @@ import org.apache.druid.query.topn.InvertedTopNMetricSpec;
import org.apache.druid.query.topn.NumericTopNMetricSpec;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
@@ -318,6 +321,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
"SELECT DISTINCT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA",
ImmutableList.of(),
ImmutableList.of(
+ new Object[]{"lookup"},
new Object[]{"druid"},
new Object[]{"sys"},
new Object[]{"INFORMATION_SCHEMA"}
@@ -344,6 +348,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
+ .add(new Object[]{"lookup", "lookyloo", "TABLE"})
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
@@ -371,6 +376,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
+ .add(new Object[]{"lookup", "lookyloo", "TABLE"})
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
@@ -488,7 +494,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testSelectStar() throws Exception
{
- String hyperLogLogCollectorClassName = HLLC_STRING;
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
@@ -504,23 +509,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
- new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, hyperLogLogCollectorClassName},
- new Object[]{
- timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2f, 2.0, hyperLogLogCollectorClassName
- },
- new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, hyperLogLogCollectorClassName},
- new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, hyperLogLogCollectorClassName},
- new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, hyperLogLogCollectorClassName},
- new Object[]{
- timestamp("2001-01-03"),
- 1L,
- "abc",
- NULL_VALUE,
- NULL_VALUE,
- 6f,
- 6.0,
- hyperLogLogCollectorClassName
- }
+ new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING},
+ new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
+ new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING},
+ new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, HLLC_STRING},
+ new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5.0, HLLC_STRING},
+ new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6.0, HLLC_STRING}
)
);
}
@@ -617,7 +611,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1.0f, 1.0, HLLC_STRING},
- new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}
+ new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}
)
);
}
@@ -642,7 +636,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{"a"},
- new Object[]{NULL_VALUE}
+ new Object[]{NULL_STRING}
)
);
}
@@ -691,8 +685,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
- new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6d, HLLC_STRING},
- new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5d, HLLC_STRING}
+ new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6d, HLLC_STRING},
+ new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5d, HLLC_STRING}
)
);
}
@@ -718,11 +712,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING},
- new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
+ new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING},
new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, HLLC_STRING},
- new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, HLLC_STRING},
- new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6.0, HLLC_STRING}
+ new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5.0, HLLC_STRING},
+ new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6.0, HLLC_STRING}
)
);
}
@@ -744,7 +738,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{"a", "a"},
- new Object[]{NULL_VALUE, NULL_VALUE}
+ new Object[]{NULL_STRING, NULL_STRING}
)
);
}
@@ -2612,10 +2606,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
- new Object[]{"", NULL_VALUE},
- new Object[]{"1", NULL_VALUE},
+ new Object[]{"", NULL_STRING},
+ new Object[]{"1", NULL_STRING},
new Object[]{"10.1", "0.1"},
- new Object[]{"2", NULL_VALUE},
+ new Object[]{"2", NULL_STRING},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"}
)
@@ -2664,9 +2658,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{"10.1", "0.1"},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"},
- new Object[]{"1", NULL_VALUE},
- new Object[]{"2", NULL_VALUE},
- new Object[]{"", NULL_VALUE}
+ new Object[]{"1", NULL_STRING},
+ new Object[]{"2", NULL_STRING},
+ new Object[]{"", NULL_STRING}
)
);
}
@@ -2694,10 +2688,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
- new Object[]{"", NULL_VALUE},
- new Object[]{"1", NULL_VALUE},
+ new Object[]{"", NULL_STRING},
+ new Object[]{"1", NULL_STRING},
new Object[]{"10.1", "0.1"},
- new Object[]{"2", NULL_VALUE},
+ new Object[]{"2", NULL_STRING},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"}
)
@@ -2735,9 +2729,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{"10.1", "0.1"},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"},
- new Object[]{"1", NULL_VALUE},
- new Object[]{"2", NULL_VALUE},
- new Object[]{"", NULL_VALUE}
+ new Object[]{"1", NULL_STRING},
+ new Object[]{"2", NULL_STRING},
+ new Object[]{"", NULL_STRING}
)
);
}
@@ -3452,7 +3446,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1.0f, 1.0d, HLLC_STRING},
new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4.0f, 4.0d, HLLC_STRING},
- new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5.0f, 5.0d, HLLC_STRING}
+ new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5.0f, 5.0d, HLLC_STRING}
)
);
}
@@ -7602,6 +7596,416 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@Test
+ public void testFilterAndGroupByLookupUsingJoinOperatorAllowNulls() throws Exception
+ {
+ // Cannot vectorize JOIN operator.
+ cannotVectorize();
+
+ testQuery(
+ "SELECT lookyloo.v, COUNT(*)\n"
+ + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xa' OR lookyloo.v IS NULL\n"
+ + "GROUP BY lookyloo.v",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimFilter(or(not(selector("j0.v", "xa", null)), selector("j0.v", null, null)))
+ .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+ .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NULL_STRING, 3L},
+ new Object[]{"xabc", 1L}
+ )
+ );
+ }
+
+ @Test
+ public void testFilterAndGroupByLookupUsingJoinOperator() throws Exception
+ {
+ // Cannot vectorize JOIN operator.
+ cannotVectorize();
+
+ testQuery(
+ "SELECT lookyloo.v, COUNT(*)\n"
+ + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xa'\n"
+ + "GROUP BY lookyloo.v",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setDimFilter(not(selector("j0.v", "xa", null)))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+ .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NULL_STRING, 3L},
+ new Object[]{"xabc", 1L}
+ )
+ );
+ }
+
+ @Test
+ public void testFilterAndGroupByLookupUsingJoinOperatorBackwards() throws Exception
+ {
+ // Cannot vectorize JOIN operator.
+ cannotVectorize();
+
+ // Like "testFilterAndGroupByLookupUsingJoinOperator", but with the table and lookup reversed.
+ testQuery(
+ "SELECT lookyloo.v, COUNT(*)\n"
+ + "FROM lookup.lookyloo RIGHT JOIN foo ON foo.dim2 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xa'\n"
+ + "GROUP BY lookyloo.v",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setDimFilter(not(selector("j0.v", "xa", null)))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+ .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NULL_STRING, 3L},
+ new Object[]{"xabc", 1L}
+ )
+ );
+ }
+
+ @Test
+ public void testGroupByInnerJoinOnLookupUsingJoinOperator() throws Exception
+ {
+ // Cannot vectorize JOIN operator.
+ cannotVectorize();
+
+ testQuery(
+ "SELECT lookyloo.v, COUNT(*)\n"
+ + "FROM foo INNER JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ + "GROUP BY lookyloo.v",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.INNER
+ )
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+ .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"xabc", 1L}
+ )
+ );
+ }
+
+ @Test
+ public void testSelectOnLookupUsingInnerJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim2, lookyloo.*\n"
+ + "FROM foo INNER JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+ JoinType.INNER
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("dim2", "j0.k", "j0.v")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"a", "a", "xa"},
+ new Object[]{"a", "a", "xa"},
+ new Object[]{"abc", "abc", "xabc"}
+ )
+ );
+ }
+
+ @Test
+ public void testLeftJoinTwoLookupsUsingJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim1, dim2, l1.v, l2.v\n"
+ + "FROM foo\n"
+ + "LEFT JOIN lookup.lookyloo l1 ON foo.dim1 = l1.k\n"
+ + "LEFT JOIN lookup.lookyloo l2 ON foo.dim2 = l2.k\n",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ ),
+ new LookupDataSource("lookyloo"),
+ "_j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("_j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("_j0.v", "dim1", "dim2", "j0.v")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"", "a", NULL_STRING, "xa"},
+ new Object[]{"10.1", NULL_STRING, NULL_STRING, NULL_STRING},
+ new Object[]{"2", "", NULL_STRING, NULL_STRING},
+ new Object[]{"1", "a", NULL_STRING, "xa"},
+ new Object[]{"def", "abc", NULL_STRING, "xabc"},
+ new Object[]{"abc", NULL_STRING, "xabc", NULL_STRING}
+ )
+ );
+ }
+
+ @Test
+ public void testLeftJoinLookupOntoLookupUsingJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim2, l1.v, l2.v\n"
+ + "FROM foo\n"
+ + "LEFT JOIN lookup.lookyloo l1 ON foo.dim2 = l1.k\n"
+ + "LEFT JOIN lookup.lookyloo l2 ON l1.k = l2.k",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ ),
+ new LookupDataSource("lookyloo"),
+ "_j0.",
+ equalsCondition(DruidExpression.fromColumn("j0.k"), DruidExpression.fromColumn("_j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("_j0.v", "dim2", "j0.v")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"a", "xa", "xa"},
+ new Object[]{NULL_STRING, NULL_STRING, NULL_STRING},
+ new Object[]{"", NULL_STRING, NULL_STRING},
+ new Object[]{"a", "xa", "xa"},
+ new Object[]{"abc", "xabc", "xabc"},
+ new Object[]{NULL_STRING, NULL_STRING, NULL_STRING}
+ )
+ );
+ }
+
+ @Test
+ public void testLeftJoinThreeLookupsUsingJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim1, dim2, l1.v, l2.v, l3.v\n"
+ + "FROM foo\n"
+ + "LEFT JOIN lookup.lookyloo l1 ON foo.dim1 = l1.k\n"
+ + "LEFT JOIN lookup.lookyloo l2 ON foo.dim2 = l2.k\n"
+ + "LEFT JOIN lookup.lookyloo l3 ON l2.k = l3.k",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ join(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ ),
+ new LookupDataSource("lookyloo"),
+ "_j0.",
+ equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("_j0.k")),
+ JoinType.LEFT
+ ),
+ new LookupDataSource("lookyloo"),
+ "__j0.",
+ equalsCondition(DruidExpression.fromColumn("_j0.k"), DruidExpression.fromColumn("__j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__j0.v", "_j0.v", "dim1", "dim2", "j0.v")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"", "a", NULL_STRING, "xa", "xa"},
+ new Object[]{"10.1", NULL_STRING, NULL_STRING, NULL_STRING, NULL_STRING},
+ new Object[]{"2", "", NULL_STRING, NULL_STRING, NULL_STRING},
+ new Object[]{"1", "a", NULL_STRING, "xa", "xa"},
+ new Object[]{"def", "abc", NULL_STRING, "xabc", "xabc"},
+ new Object[]{"abc", NULL_STRING, "xabc", NULL_STRING, NULL_STRING}
+ )
+ );
+ }
+
+ @Test
+ public void testSelectOnLookupUsingLeftJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim1, lookyloo.*\n"
+ + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
+ .columns("dim1", "j0.k", "j0.v")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"", NULL_STRING, NULL_STRING},
+ new Object[]{"10.1", NULL_STRING, NULL_STRING},
+ new Object[]{"2", NULL_STRING, NULL_STRING},
+ new Object[]{"1", NULL_STRING, NULL_STRING},
+ new Object[]{"def", NULL_STRING, NULL_STRING},
+ new Object[]{"abc", "abc", "xabc"}
+ )
+ );
+ }
+
+ @Test
+ public void testSelectOnLookupUsingRightJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim1, lookyloo.*\n"
+ + "FROM foo RIGHT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.RIGHT
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
+ .columns("dim1", "j0.k", "j0.v")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"abc", "abc", "xabc"},
+ new Object[]{NULL_STRING, "a", "xa"},
+ new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"}
+ )
+ );
+ }
+
+ @Test
+ public void testSelectOnLookupUsingFullJoinOperator() throws Exception
+ {
+ testQuery(
+ "SELECT dim1, m1, cnt, lookyloo.*\n"
+ + "FROM foo FULL JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.FULL
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
+ .columns("cnt", "dim1", "j0.k", "j0.v", "m1")
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"", 1f, 1L, NULL_STRING, NULL_STRING},
+ new Object[]{"10.1", 2f, 1L, NULL_STRING, NULL_STRING},
+ new Object[]{"2", 3f, 1L, NULL_STRING, NULL_STRING},
+ new Object[]{"1", 4f, 1L, NULL_STRING, NULL_STRING},
+ new Object[]{"def", 5f, 1L, NULL_STRING, NULL_STRING},
+ new Object[]{"abc", 6f, 1L, "abc", "xabc"},
+ new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "a", "xa"},
+ new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"}
+ )
+ );
+ }
+
+ @Test
public void testCountDistinctOfLookup() throws Exception
{
// Cannot vectorize due to "cardinality" aggregator.
@@ -7642,6 +8046,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@Test
+ public void testCountDistinctOfLookupUsingJoinOperator() throws Exception
+ {
+ // Cannot yet vectorize the JOIN operator.
+ cannotVectorize();
+
+ testQuery(
+ "SELECT COUNT(DISTINCT lookyloo.v)\n"
+ + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+ JoinType.LEFT
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .aggregators(aggregators(
+ new CardinalityAggregatorFactory(
+ "a0",
+ null,
+ ImmutableList.of(DefaultDimensionSpec.of("j0.v")),
+ false,
+ true
+ )
+ ))
+ .context(TIMESERIES_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{NullHandling.replaceWithDefault() ? 2L : 1L}
+ )
+ );
+ }
+
+ @Test
public void testTimeseries() throws Exception
{
testQuery(
@@ -10648,8 +11092,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
);
}
-
-
+
+
@Test
public void testQueryContextOuterLimit() throws Exception
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
index d525fff..20d77bc 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
@@ -147,6 +147,7 @@ public class SqlResourceTest extends CalciteTestBase
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
+ CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
index 763524e..676c0b7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
@@ -42,14 +42,14 @@ public class CalcitesTest extends CalciteTestBase
@Test
public void testFindUnusedPrefix()
{
- Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
- Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
- Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
- Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
- Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
- Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
- Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
- Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
- Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x")));
+ Assert.assertEquals("_x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x0")));
+ Assert.assertEquals("_x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x4")));
+ Assert.assertEquals("__x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "_xbxx")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x")));
+ Assert.assertEquals("__x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
}
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java
new file mode 100644
index 0000000..7e0ed30
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class DruidRelsTest
+{
+ @Test
+ public void test_isScanOrMapping_scan()
+ {
+ final DruidRel<?> rel = mockDruidRel(DruidQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
+ Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertTrue(DruidRels.isScanOrMapping(rel, false));
+ EasyMock.verify(rel, rel.getPartialDruidQuery());
+ }
+
+ @Test
+ public void test_isScanOrMapping_scanJoin()
+ {
+ final DruidRel<?> rel = mockDruidRel(DruidJoinQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
+ Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+ EasyMock.verify(rel, rel.getPartialDruidQuery());
+ }
+
+ @Test
+ public void test_isScanOrMapping_scanQuery()
+ {
+ final DruidRel<?> rel = mockDruidRel(DruidOuterQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+ EasyMock.verify(rel, rel.getPartialDruidQuery());
+ }
+
+ @Test
+ public void test_isScanOrMapping_mapping()
+ {
+ final Project project = mockProject(true);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidQueryRel.class,
+ PartialDruidQuery.Stage.SELECT_PROJECT,
+ project,
+ null
+ );
+ Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertTrue(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+
+ @Test
+ public void test_isScanOrMapping_mappingJoin()
+ {
+ final Project project = mockProject(true);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidJoinQueryRel.class,
+ PartialDruidQuery.Stage.SELECT_PROJECT,
+ project,
+ null
+ );
+ Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+
+ @Test
+ public void test_isScanOrMapping_nonMapping()
+ {
+ final Project project = mockProject(false);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidQueryRel.class,
+ PartialDruidQuery.Stage.SELECT_PROJECT,
+ project,
+ null
+ );
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+
+ @Test
+ public void test_isScanOrMapping_nonMappingJoin()
+ {
+ final Project project = mockProject(false);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidJoinQueryRel.class,
+ PartialDruidQuery.Stage.SELECT_PROJECT,
+ project,
+ null
+ );
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+
+ @Test
+ public void test_isScanOrMapping_filterThenProject()
+ {
+ final Project project = mockProject(true);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidQueryRel.class,
+ PartialDruidQuery.Stage.SELECT_PROJECT,
+ project,
+ mockFilter()
+ );
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+
+ @Test
+ public void test_isScanOrMapping_filterThenProjectJoin()
+ {
+ final Project project = mockProject(true);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidJoinQueryRel.class,
+ PartialDruidQuery.Stage.SELECT_PROJECT,
+ project,
+ mockFilter()
+ );
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+
+ @Test
+ public void test_isScanOrMapping_filter()
+ {
+ final DruidRel<?> rel = mockDruidRel(
+ DruidQueryRel.class,
+ PartialDruidQuery.Stage.WHERE_FILTER,
+ null,
+ mockFilter()
+ );
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery());
+ }
+
+ @Test
+ public void test_isScanOrMapping_filterJoin()
+ {
+ final DruidRel<?> rel = mockDruidRel(
+ DruidJoinQueryRel.class,
+ PartialDruidQuery.Stage.WHERE_FILTER,
+ null,
+ mockFilter()
+ );
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+ Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery());
+ }
+
+ @Test
+ public void test_isScanOrMapping_allStages()
+ {
+ final ImmutableSet<PartialDruidQuery.Stage> okStages = ImmutableSet.of(
+ PartialDruidQuery.Stage.SCAN,
+ PartialDruidQuery.Stage.SELECT_PROJECT
+ );
+
+ for (PartialDruidQuery.Stage stage : PartialDruidQuery.Stage.values()) {
+ final Project project = mockProject(true);
+ final DruidRel<?> rel = mockDruidRel(
+ DruidQueryRel.class,
+ stage,
+ project,
+ null
+ );
+
+ Assert.assertEquals(stage.toString(), okStages.contains(stage), DruidRels.isScanOrMapping(rel, true));
+ Assert.assertEquals(stage.toString(), okStages.contains(stage), DruidRels.isScanOrMapping(rel, false));
+
+ EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+ }
+ }
+
+ private static DruidRel<?> mockDruidRel(
+ final Class<? extends DruidRel<?>> clazz,
+ final PartialDruidQuery.Stage stage,
+ @Nullable Project selectProject,
+ @Nullable Filter whereFilter
+ )
+ {
+ // DruidQueryRels rely on a ton of Calcite stuff like RelOptCluster, RelOptTable, etc, which is quite verbose to
+ // create real instances of. So, tragically, we'll use EasyMock.
+ final DruidRel<?> mockRel = EasyMock.mock(clazz);
+ final PartialDruidQuery mockPartialQuery = EasyMock.mock(PartialDruidQuery.class);
+ EasyMock.expect(mockPartialQuery.stage()).andReturn(stage).anyTimes();
+ EasyMock.expect(mockPartialQuery.getSelectProject()).andReturn(selectProject).anyTimes();
+ EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes();
+ EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes();
+ EasyMock.replay(mockRel, mockPartialQuery);
+ return mockRel;
+ }
+
+ private static Project mockProject(final boolean mapping)
+ {
+ final Project mockProject = EasyMock.mock(Project.class);
+ EasyMock.expect(mockProject.isMapping()).andReturn(mapping).anyTimes();
+ EasyMock.replay(mockProject);
+ return mockProject;
+ }
+
+ private static Filter mockFilter()
+ {
+ return EasyMock.mock(Filter.class);
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
new file mode 100644
index 0000000..dd706ff
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rule;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+public class DruidJoinRuleTest
+{
+ private final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl());
+
+ private final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE);
+
+ private final RelDataType leftType =
+ new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE).createStructType(
+ ImmutableList.of(typeFactory.createSqlType(SqlTypeName.VARCHAR)),
+ ImmutableList.of("left")
+ );
+
+ private final RelDataType joinType =
+ new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE).createStructType(
+ ImmutableList.of(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR),
+ typeFactory.createSqlType(SqlTypeName.VARCHAR)
+ ),
+ ImmutableList.of("left", "right")
+ );
+
+ @Test
+ public void test_canHandleCondition_leftEqRight()
+ {
+ Assert.assertTrue(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeInputRef(joinType, 0),
+ rexBuilder.makeInputRef(joinType, 1)
+ ),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_leftFnEqRight()
+ {
+ Assert.assertTrue(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("foo"),
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
+ ),
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+ ),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_leftEqRightFn()
+ {
+ Assert.assertFalse(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CONCAT,
+ rexBuilder.makeLiteral("foo"),
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+ )
+ ),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_leftEqLeft()
+ {
+ Assert.assertFalse(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
+ ),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_rightEqRight()
+ {
+ Assert.assertFalse(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1),
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+ ),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_true()
+ {
+ Assert.assertTrue(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeLiteral(true),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_canHandleCondition_false()
+ {
+ Assert.assertTrue(
+ DruidJoinRule.canHandleCondition(
+ rexBuilder.makeLiteral(false),
+ leftType
+ )
+ );
+ }
+
+ @Test
+ public void test_decomposeAnd_notAnAnd()
+ {
+ final List<RexNode> rexNodes = DruidJoinRule.decomposeAnd(rexBuilder.makeInputRef(leftType, 0));
+
+ Assert.assertEquals(1, rexNodes.size());
+ Assert.assertEquals(rexBuilder.makeInputRef(leftType, 0), Iterables.getOnlyElement(rexNodes));
+ }
+
+ @Test
+ public void test_decomposeAnd_basic()
+ {
+ final List<RexNode> decomposed = DruidJoinRule.decomposeAnd(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.AND,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.AND,
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(1)),
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(2))
+ ),
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.AND,
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(3)),
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(4))
+ )
+ )
+ );
+
+ Assert.assertEquals(
+ ImmutableList.of(
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(1)),
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(2)),
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(3)),
+ rexBuilder.makeExactLiteral(BigDecimal.valueOf(4))
+ ),
+ decomposed
+ );
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5c225b4..e60f831 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -118,6 +118,7 @@ import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.view.NoopViewManager;
@@ -720,7 +721,10 @@ public class CalciteTests
.buildMMappedIndex();
- return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+ return new SpecificSegmentsQuerySegmentWalker(
+ conglomerate,
+ INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class)
+ ).add(
DataSegment.builder()
.dataSource(DATASOURCE1)
.interval(index1.getDataInterval())
@@ -857,6 +861,10 @@ public class CalciteTests
).get(0);
}
+ public static LookupSchema createMockLookupSchema()
+ {
+ return new LookupSchema(INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class));
+ }
public static SystemSchema createMockSystemSchema(
final DruidSchema druidSchema,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 155dc41..ccb4a3d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -19,58 +19,168 @@
package org.apache.druid.sql.calcite.util;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
+import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
-import org.apache.druid.query.Druids;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.NoopQueryRunner;
+import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.join.InlineJoinableFactory;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.Joinables;
+import org.apache.druid.segment.join.LookupJoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactory;
+import org.apache.druid.server.ClientQuerySegmentWalker;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+/**
+ * A self-contained class that executes queries similarly to the normal Druid query stack.
+ *
+ * {@link ClientQuerySegmentWalker}, the same class that Brokers use as the entry point for their query stack, is
+ * used directly. Our own class {@link DataServerLikeWalker} mimics the behavior of
+ * {@link org.apache.druid.server.coordination.ServerManager}, the entry point for Historicals. That class isn't used
+ * directly because the sheer volume of dependencies makes it quite verbose to use in a test environment.
+ */
public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, Closeable
{
private final QueryRunnerFactoryConglomerate conglomerate;
+ private final QuerySegmentWalker walker;
+ private final JoinableFactory joinableFactory;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
private final List<Closeable> closeables = new ArrayList<>();
private final List<DataSegment> segments = new ArrayList<>();
- public SpecificSegmentsQuerySegmentWalker(QueryRunnerFactoryConglomerate conglomerate)
+ /**
+ * Create an instance using the provided query runner factory conglomerate and lookup provider.
+ */
+ public SpecificSegmentsQuerySegmentWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final LookupExtractorFactoryContainerProvider lookupProvider
+ )
{
this.conglomerate = conglomerate;
+ this.joinableFactory = new MapJoinableFactory(
+ ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
+ .put(InlineDataSource.class, new InlineJoinableFactory())
+ .put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
+ .build()
+ );
+ this.walker = new ClientQuerySegmentWalker(
+ new NoopServiceEmitter(),
+ new DataServerLikeWalker(),
+ new QueryToolChestWarehouse()
+ {
+ @Override
+ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
+ {
+ return conglomerate.findFactory(query).getToolchest();
+ }
+ },
+ new RetryQueryRunnerConfig(),
+ TestHelper.makeJsonMapper(),
+ new ServerConfig(),
+ null /* Cache */,
+ new CacheConfig()
+ {
+ @Override
+ public boolean isPopulateCache()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isUseCache()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isPopulateResultLevelCache()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isUseResultLevelCache()
+ {
+ return false;
+ }
+ }
+ );
+ }
+
+ /**
+ * Create an instance without any lookups.
+ */
+ public SpecificSegmentsQuerySegmentWalker(final QueryRunnerFactoryConglomerate conglomerate)
+ {
+ this(
+ conglomerate,
+ new LookupExtractorFactoryContainerProvider()
+ {
+ @Override
+ public Set<String> getAllLookupNames()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Optional<LookupExtractorFactoryContainer> get(String lookupName)
+ {
+ return Optional.empty();
+ }
+ }
+ );
}
public SpecificSegmentsQuerySegmentWalker add(
@@ -79,8 +189,10 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
)
{
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
- final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines
- .computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural()));
+ final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.computeIfAbsent(
+ descriptor.getDataSource(),
+ datasource -> new VersionedIntervalTimeline<>(Ordering.natural())
+ );
timeline.add(
descriptor.getInterval(),
descriptor.getVersion(),
@@ -102,70 +214,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final Iterable<Interval> intervals
)
{
- Query<T> newQuery = query;
- if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) {
- newQuery = (Query<T>) Druids.ScanQueryBuilder.copy((ScanQuery) query)
- .intervals(new MultipleSpecificSegmentSpec(ImmutableList.of()))
- .build();
- }
-
- final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(newQuery);
- if (factory == null) {
- throw new ISE("Unknown query type[%s].", newQuery.getClass());
- }
-
- final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
-
- return new FinalizeResultsQueryRunner<>(
- toolChest.postMergeQueryDecoration(
- toolChest.mergeResults(
- toolChest.preMergeQueryDecoration(
- (queryPlus, responseContext) -> {
- Query<T> query1 = queryPlus.getQuery();
- Query<T> newQuery1 = query1;
- if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) {
- newQuery1 = (Query<T>) Druids.ScanQueryBuilder.copy((ScanQuery) query)
- .intervals(new MultipleSpecificSegmentSpec(
- ImmutableList.of(new SegmentDescriptor(
- Intervals.of("2015-04-12/2015-04-13"),
- "4",
- 0
- ))))
- .context(ImmutableMap.of(
- ScanQuery.CTX_KEY_OUTERMOST,
- false
- ))
- .build();
- }
- final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = getTimelineForTableDataSource(
- newQuery1);
- return makeBaseRunner(
- newQuery1,
- toolChest,
- factory,
- FunctionalIterable
- .create(intervals)
- .transformCat(
- interval -> timeline.lookup(interval)
- )
- .transformCat(
- holder -> FunctionalIterable
- .create(holder.getObject())
- .transform(
- chunk -> new SegmentDescriptor(
- holder.getInterval(),
- holder.getVersion(),
- chunk.getChunkNumber()
- )
- )
- )
- ).run(QueryPlus.wrap(newQuery1), responseContext);
- }
- )
- )
- ),
- toolChest
- );
+ return walker.getQueryRunnerForIntervals(query, intervals);
}
@Override
@@ -174,23 +223,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final Iterable<SegmentDescriptor> specs
)
{
- final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
- if (factory == null) {
- throw new ISE("Unknown query type[%s].", query.getClass());
- }
-
- final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
-
- return new FinalizeResultsQueryRunner<>(
- toolChest.postMergeQueryDecoration(
- toolChest.mergeResults(
- toolChest.preMergeQueryDecoration(
- makeBaseRunner(query, toolChest, factory, specs)
- )
- )
- ),
- toolChest
- );
+ return walker.getQueryRunnerForSegments(query, specs);
}
@Override
@@ -201,52 +234,182 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
}
- private <T> VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimelineForTableDataSource(Query<T> query)
+ private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Interval interval)
{
- if (query.getDataSource() instanceof TableDataSource) {
- return timelines.get(((TableDataSource) query.getDataSource()).getName());
+ final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
+
+ if (timeline == null) {
+ return Collections.emptyList();
} else {
- throw new UOE("DataSource type[%s] unsupported", query.getDataSource().getClass().getName());
+ final List<WindowedSegment> retVal = new ArrayList<>();
+
+ for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : timeline.lookup(interval)) {
+ for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
+ retVal.add(new WindowedSegment(chunk.getObject(), holder.getInterval()));
+ }
+ }
+
+ return retVal;
}
}
- private <T> QueryRunner<T> makeBaseRunner(
- final Query<T> query,
- final QueryToolChest<T, Query<T>> toolChest,
- final QueryRunnerFactory<T, Query<T>> factory,
- final Iterable<SegmentDescriptor> specs
- )
+ private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Iterable<SegmentDescriptor> specs)
{
- final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = getTimelineForTableDataSource(query);
+ final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
+
if (timeline == null) {
- return new NoopQueryRunner<>();
+ return Collections.emptyList();
+ } else {
+ final List<WindowedSegment> retVal = new ArrayList<>();
+
+ for (SegmentDescriptor spec : specs) {
+ final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
+ spec.getInterval(),
+ spec.getVersion()
+ );
+ retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval()));
+ }
+
+ return retVal;
+ }
+ }
+
+ public static class WindowedSegment
+ {
+ private final Segment segment;
+ private final Interval interval;
+
+ public WindowedSegment(Segment segment, Interval interval)
+ {
+ this.segment = segment;
+ this.interval = interval;
+ Preconditions.checkArgument(segment.getId().getInterval().contains(interval));
+ }
+
+ public Segment getSegment()
+ {
+ return segment;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public SegmentDescriptor getDescriptor()
+ {
+ return new SegmentDescriptor(interval, segment.getId().getVersion(), segment.getId().getPartitionNum());
+ }
+ }
+
+ /**
+ * Mimics the behavior of a data server (e.g. Historical).
+ *
+ * Compare to {@link org.apache.druid.server.SegmentManager}.
+ */
+ private class DataServerLikeWalker implements QuerySegmentWalker
+ {
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
+ {
+ final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+
+ if (!analysis.isConcreteTableBased()) {
+ throw new ISE("Cannot handle datasource: %s", query.getDataSource());
+ }
+
+ final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
+
+ FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable
+ .create(intervals)
+ .transformCat(interval -> getSegmentsForTable(dataSourceName, interval))
+ .transform(WindowedSegment::getDescriptor);
+
+ return getQueryRunnerForSegments(query, segmentDescriptors);
}
- return new FinalizeResultsQueryRunner<>(
- toolChest.mergeResults(
- factory.mergeRunners(
- Execs.directExecutor(),
- FunctionalIterable
- .create(specs)
- .transformCat(
- descriptor -> {
- final PartitionHolder<ReferenceCountingSegment> holder = timeline.findEntry(
- descriptor.getInterval(),
- descriptor.getVersion()
- );
-
- return Iterables.transform(
- holder,
- chunk -> new SpecificSegmentQueryRunner<T>(
- factory.createRunner(chunk.getObject()),
- new SpecificSegmentSpec(descriptor)
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
+ {
+ final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
+ if (factory == null) {
+ throw new ISE("Unknown query type[%s].", query.getClass());
+ }
+
+ final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+
+ if (!analysis.isConcreteTableBased()) {
+ throw new ISE("Cannot handle datasource: %s", query.getDataSource());
+ }
+
+ final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
+
+ final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
+
+ // Make sure this query type can handle the subquery, if present.
+ if (analysis.isQuery()
+ && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
+ throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
+ }
+
+ final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
+ analysis.getPreJoinableClauses(),
+ joinableFactory,
+ new AtomicLong()
+ );
+
+ final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
+ toolChest.postMergeQueryDecoration(
+ toolChest.mergeResults(
+ toolChest.preMergeQueryDecoration(
+ makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
+ )
+ )
+ ),
+ toolChest
+ );
+
+ // Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
+ // This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
+ // to function properly.
+ return (theQuery, responseContext) -> baseRunner.run(
+ theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
+ responseContext
+ );
+ }
+
+ private <T> QueryRunner<T> makeTableRunner(
+ final QueryToolChest<T, Query<T>> toolChest,
+ final QueryRunnerFactory<T, Query<T>> factory,
+ final Iterable<WindowedSegment> segments,
+ final Function<Segment, Segment> segmentMapFn
+ )
+ {
+ final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
+
+ if (segmentsList.isEmpty()) {
+ // Note: this is not correct when there's a right or full outer join going on.
+ // See https://github.com/apache/druid/issues/9229 for details.
+ return new NoopQueryRunner<>();
+ }
+
+ return new FinalizeResultsQueryRunner<>(
+ toolChest.mergeResults(
+ factory.mergeRunners(
+ Execs.directExecutor(),
+ FunctionalIterable
+ .create(segmentsList)
+ .transform(
+ segment ->
+ new SpecificSegmentQueryRunner<>(
+ factory.createRunner(segmentMapFn.apply(segment.getSegment())),
+ new SpecificSegmentSpec(segment.getDescriptor())
)
- );
- }
- )
- )
- ),
- toolChest
- );
+ )
+ )
+ ),
+ toolChest
+ );
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org