You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/02/01 07:51:29 UTC

[druid] branch master updated: SQL join support for lookups. (#9294)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b411443  SQL join support for lookups. (#9294)
b411443 is described below

commit b411443d228c14de746311e09db28b437c2746a5
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Jan 31 23:51:16 2020 -0800

    SQL join support for lookups. (#9294)
    
    * SQL join support for lookups.
    
    1) Add LookupSchema to SQL, so lookups show up in the catalog.
    2) Add join-related rels and rules to SQL, allowing joins to be planned into
       native Druid queries.
    
    * Add two missing LookupSchema calls in tests.
    
    * Fix tests.
    
    * Fix typo.
---
 .../apache/druid/benchmark/query/SqlBenchmark.java |   1 +
 .../benchmark/query/SqlVsNativeBenchmark.java      |   1 +
 .../sql/TDigestSketchSqlAggregatorTest.java        |   1 +
 .../hll/sql/HllSketchSqlAggregatorTest.java        |   1 +
 .../sql/DoublesSketchSqlAggregatorTest.java        |   1 +
 .../theta/sql/ThetaSketchSqlAggregatorTest.java    |   1 +
 .../bloom/sql/BloomFilterSqlAggregatorTest.java    |   1 +
 ...dBucketsHistogramQuantileSqlAggregatorTest.java |   1 +
 .../histogram/sql/QuantileSqlAggregatorTest.java   |   1 +
 .../variance/sql/VarianceSqlAggregatorTest.java    |   1 +
 .../druid/segment/join/MapJoinableFactory.java     |   2 +-
 .../druid/server/ClientQuerySegmentWalker.java     |  29 +-
 .../apache/druid/sql/calcite/planner/Calcites.java |  31 +-
 .../druid/sql/calcite/planner/PlannerFactory.java  |   5 +
 .../apache/druid/sql/calcite/planner/Rules.java    |   6 +-
 .../druid/sql/calcite/rel/DruidJoinQueryRel.java   | 344 ++++++++++++++
 .../druid/sql/calcite/rel/DruidOuterQueryRel.java  |   2 +-
 .../apache/druid/sql/calcite/rel/DruidQuery.java   | 126 +++--
 .../druid/sql/calcite/rel/DruidQueryRel.java       |   2 +-
 .../apache/druid/sql/calcite/rel/DruidRels.java    |  76 +++
 .../druid/sql/calcite/rel/PartialDruidQuery.java   |  11 +-
 .../apache/druid/sql/calcite/rel/Projection.java   |   2 +-
 .../sql/calcite/rel/VirtualColumnRegistry.java     |   2 +-
 .../druid/sql/calcite/rule/DruidJoinRule.java      | 163 +++++++
 .../apache/druid/sql/calcite/rule/DruidRules.java  |   3 +-
 .../druid/sql/calcite/schema/LookupSchema.java     |  67 +++
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |   2 +
 .../druid/sql/avatica/DruidStatementTest.java      |   1 +
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |  67 ++-
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 526 +++++++++++++++++++--
 .../druid/sql/calcite/http/SqlResourceTest.java    |   1 +
 .../druid/sql/calcite/planner/CalcitesTest.java    |  18 +-
 .../druid/sql/calcite/rel/DruidRelsTest.java       | 241 ++++++++++
 .../druid/sql/calcite/rule/DruidJoinRuleTest.java  | 203 ++++++++
 .../druid/sql/calcite/util/CalciteTests.java       |  10 +-
 .../util/SpecificSegmentsQuerySegmentWalker.java   | 419 +++++++++++-----
 36 files changed, 2112 insertions(+), 257 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index f51684a..0a4d2a2 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -198,6 +198,7 @@ public class SqlBenchmark
 
     plannerFactory = new PlannerFactory(
         druidSchema,
+        CalciteTests.createMockLookupSchema(),
         systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate.lhs),
         CalciteTests.createOperatorTable(),
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
index 043405d..74f5b51 100644
--- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
@@ -121,6 +121,7 @@ public class SqlVsNativeBenchmark
 
     plannerFactory = new PlannerFactory(
         druidSchema,
+        CalciteTests.createMockLookupSchema(),
         systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         CalciteTests.createOperatorTable(),
diff --git a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java
index 8056a10..0eb4571 100644
--- a/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java
+++ b/extensions-contrib/tdigestsketch/src/test/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchSqlAggregatorTest.java
@@ -160,6 +160,7 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 77bf27c..d5c1cda 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -188,6 +188,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index 47711ef..6d6c5ed 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -189,6 +189,7 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
index f4ef3b7..9ff3a60 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java
@@ -186,6 +186,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
index 9ca5885..92dba66 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -202,6 +202,7 @@ public class BloomFilterSqlAggregatorTest extends InitializedNullHandlingTest
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
index bb9d3d0..3d00aa8 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
@@ -172,6 +172,7 @@ public class FixedBucketsHistogramQuantileSqlAggregatorTest extends CalciteTestB
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index be90e8f..6b0e308 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -171,6 +171,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
       new PlannerFactory(
           druidSchema,
+          CalciteTests.createMockLookupSchema(),
           systemSchema,
           CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
           operatorTable,
diff --git a/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java b/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java
index a061f47..0247c88 100644
--- a/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java
+++ b/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java
@@ -174,6 +174,7 @@ public class VarianceSqlAggregatorTest extends InitializedNullHandlingTest
     sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
         new PlannerFactory(
             druidSchema,
+            CalciteTests.createMockLookupSchema(),
             systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
index 5575348..beb8106 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java
@@ -36,7 +36,7 @@ public class MapJoinableFactory implements JoinableFactory
   private final Map<Class<? extends DataSource>, JoinableFactory> joinableFactories;
 
   @Inject
-  MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
+  public MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
   {
     // Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
     // Class doesn't override Object.equals().
diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index b484486..27bccaf 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -48,7 +48,7 @@ import org.joda.time.Interval;
 public class ClientQuerySegmentWalker implements QuerySegmentWalker
 {
   private final ServiceEmitter emitter;
-  private final CachingClusteredClient baseClient;
+  private final QuerySegmentWalker baseClient;
   private final QueryToolChestWarehouse warehouse;
   private final RetryQueryRunnerConfig retryConfig;
   private final ObjectMapper objectMapper;
@@ -56,10 +56,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
   private final Cache cache;
   private final CacheConfig cacheConfig;
 
-  @Inject
   public ClientQuerySegmentWalker(
       ServiceEmitter emitter,
-      CachingClusteredClient baseClient,
+      QuerySegmentWalker baseClient,
       QueryToolChestWarehouse warehouse,
       RetryQueryRunnerConfig retryConfig,
       ObjectMapper objectMapper,
@@ -78,6 +77,30 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
     this.cacheConfig = cacheConfig;
   }
 
+  @Inject
+  ClientQuerySegmentWalker(
+      ServiceEmitter emitter,
+      CachingClusteredClient baseClient,
+      QueryToolChestWarehouse warehouse,
+      RetryQueryRunnerConfig retryConfig,
+      ObjectMapper objectMapper,
+      ServerConfig serverConfig,
+      Cache cache,
+      CacheConfig cacheConfig
+  )
+  {
+    this(
+        emitter,
+        (QuerySegmentWalker) baseClient,
+        warehouse,
+        retryConfig,
+        objectMapper,
+        serverConfig,
+        cache,
+        cacheConfig
+    );
+  }
+
   @Override
   public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
   {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
index 81f45f2..9eab11a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
@@ -20,6 +20,7 @@
 package org.apache.druid.sql.calcite.planner;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.io.BaseEncoding;
 import com.google.common.primitives.Chars;
 import org.apache.calcite.jdbc.CalciteSchema;
@@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
 import org.apache.druid.sql.calcite.schema.InformationSchema;
+import org.apache.druid.sql.calcite.schema.LookupSchema;
 import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -55,8 +57,10 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.DateTimeFormatterBuilder;
 import org.joda.time.format.ISODateTimeFormat;
 
+import javax.annotation.Nullable;
 import java.nio.charset.Charset;
 import java.util.NavigableSet;
+import java.util.TreeSet;
 import java.util.regex.Pattern;
 
 /**
@@ -104,12 +108,14 @@ public class Calcites
 
   public static SchemaPlus createRootSchema(
       final DruidSchema druidSchema,
+      final LookupSchema lookupSchema,
       final SystemSchema systemSchema,
       final AuthorizerMapper authorizerMapper
   )
   {
     final SchemaPlus rootSchema = CalciteSchema.createRootSchema(false, false).plus();
     rootSchema.add(DruidSchema.NAME, druidSchema);
+    rootSchema.add(LookupSchema.NAME, lookupSchema);
     rootSchema.add(InformationSchema.NAME, new InformationSchema(rootSchema, authorizerMapper));
     rootSchema.add(SystemSchema.NAME, systemSchema);
     return rootSchema;
@@ -137,6 +143,7 @@ public class Calcites
 
   }
 
+  @Nullable
   public static ValueType getValueTypeForSqlTypeName(SqlTypeName sqlTypeName)
   {
     if (SqlTypeName.FLOAT == sqlTypeName) {
@@ -345,23 +352,23 @@ public class Calcites
   }
 
   /**
-   * Checks if a RexNode is a literal int or not. If this returns true, then {@code RexLiteral.intValue(literal)} can be
-   * used to get the value of the literal.
-   *
-   * @param rexNode the node
-   *
-   * @return true if this is an int
+   * Find a string that is either equal to "basePrefix", or basePrefix prepended by underscores, and where nothing in
+   * "strings" starts with prefix plus a digit.
    */
-  public static boolean isIntLiteral(final RexNode rexNode)
+  public static String findUnusedPrefixForDigits(final String basePrefix, final Iterable<String> strings)
   {
-    return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
-  }
+    final NavigableSet<String> navigableStrings;
+
+    if (strings instanceof NavigableSet) {
+      navigableStrings = (NavigableSet<String>) strings;
+    } else {
+      navigableStrings = new TreeSet<>();
+      Iterables.addAll(navigableStrings, strings);
+    }
 
-  public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
-  {
     String prefix = basePrefix;
 
-    while (!isUnusedPrefix(prefix, strings)) {
+    while (!isUnusedPrefix(prefix, navigableStrings)) {
       prefix = "_" + prefix;
     }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
index 3d5f2fa..135a621 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
@@ -42,6 +42,7 @@ import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.calcite.rel.QueryMaker;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.LookupSchema;
 import org.apache.druid.sql.calcite.schema.SystemSchema;
 
 import java.util.Map;
@@ -59,6 +60,7 @@ public class PlannerFactory
       .build();
 
   private final DruidSchema druidSchema;
+  private final LookupSchema lookupSchema;
   private final SystemSchema systemSchema;
   private final QueryLifecycleFactory queryLifecycleFactory;
   private final DruidOperatorTable operatorTable;
@@ -70,6 +72,7 @@ public class PlannerFactory
   @Inject
   public PlannerFactory(
       final DruidSchema druidSchema,
+      final LookupSchema lookupSchema,
       final SystemSchema systemSchema,
       final QueryLifecycleFactory queryLifecycleFactory,
       final DruidOperatorTable operatorTable,
@@ -80,6 +83,7 @@ public class PlannerFactory
   )
   {
     this.druidSchema = druidSchema;
+    this.lookupSchema = lookupSchema;
     this.systemSchema = systemSchema;
     this.queryLifecycleFactory = queryLifecycleFactory;
     this.operatorTable = operatorTable;
@@ -96,6 +100,7 @@ public class PlannerFactory
   {
     final SchemaPlus rootSchema = Calcites.createRootSchema(
         druidSchema,
+        lookupSchema,
         systemSchema,
         authorizerMapper
     );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
index 9463ab5..b2b3659 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
@@ -101,7 +101,7 @@ public class Rules
           JoinPushExpressionsRule.INSTANCE,
           FilterAggregateTransposeRule.INSTANCE,
           ProjectWindowTransposeRule.INSTANCE,
-          JoinCommuteRule.INSTANCE,
+          JoinCommuteRule.SWAP_OUTER,
           JoinPushThroughJoinRule.RIGHT,
           JoinPushThroughJoinRule.LEFT,
           SortProjectTransposeRule.INSTANCE,
@@ -130,13 +130,13 @@ public class Rules
           AggregateValuesRule.INSTANCE
       );
 
-  // Rules from VolcanoPlanner's registerAbstractRelationalRules.
+  // Rules from VolcanoPlanner's registerAbstractRelationalRules, minus JoinCommuteRule since it's already
+  // in DEFAULT_RULES.
   private static final List<RelOptRule> VOLCANO_ABSTRACT_RULES =
       ImmutableList.of(
           FilterJoinRule.FILTER_ON_JOIN,
           FilterJoinRule.JOIN,
           AbstractConverter.ExpandConversionRule.INSTANCE,
-          JoinCommuteRule.INSTANCE,
           AggregateRemoveRule.INSTANCE,
           UnionToDistinctRule.INSTANCE,
           ProjectRemoveRule.INSTANCE,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
new file mode 100644
index 0000000..1142188
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * DruidRel that uses a {@link JoinDataSource}.
+ */
+public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
+{
+  private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__");
+  private static final double COST_FACTOR = 100.0;
+
+  private final PartialDruidQuery partialQuery;
+  private final Join joinRel;
+  private RelNode left;
+  private RelNode right;
+
+  private DruidJoinQueryRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      Join joinRel,
+      PartialDruidQuery partialQuery,
+      QueryMaker queryMaker
+  )
+  {
+    super(cluster, traitSet, queryMaker);
+    this.joinRel = joinRel;
+    this.left = joinRel.getLeft();
+    this.right = joinRel.getRight();
+    this.partialQuery = partialQuery;
+  }
+
+  public static DruidJoinQueryRel create(final Join joinRel, final QueryMaker queryMaker)
+  {
+    return new DruidJoinQueryRel(
+        joinRel.getCluster(),
+        joinRel.getTraitSet(),
+        joinRel,
+        PartialDruidQuery.create(joinRel),
+        queryMaker
+    );
+  }
+
+  @Override
+  public PartialDruidQuery getPartialDruidQuery()
+  {
+    return partialQuery;
+  }
+
+  @Override
+  public Sequence<Object[]> runQuery()
+  {
+    // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+    // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+    // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+    final DruidQuery query = toDruidQuery(false);
+    return getQueryMaker().runQuery(query);
+  }
+
+  @Override
+  public DruidJoinQueryRel withPartialQuery(final PartialDruidQuery newQueryBuilder)
+  {
+    return new DruidJoinQueryRel(
+        getCluster(),
+        getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
+        joinRel,
+        newQueryBuilder,
+        getQueryMaker()
+    );
+  }
+
+  @Override
+  public int getQueryCount()
+  {
+    return ((DruidRel<?>) left).getQueryCount() + ((DruidRel<?>) right).getQueryCount();
+  }
+
+  @Override
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
+  {
+    final DruidRel<?> leftDruidRel = (DruidRel<?>) left;
+    final DruidQuery leftQuery = Preconditions.checkNotNull((leftDruidRel).toDruidQuery(false), "leftQuery");
+    final RowSignature leftSignature = leftQuery.getOutputRowSignature();
+    final DataSource leftDataSource;
+
+    final DruidRel<?> rightDruidRel = (DruidRel<?>) right;
+    final DruidQuery rightQuery = Preconditions.checkNotNull(rightDruidRel.toDruidQuery(false), "rightQuery");
+    final RowSignature rightSignature = rightQuery.getOutputRowSignature();
+    final DataSource rightDataSource;
+
+    // Left rel: allow direct embedding of scans/mappings including those of joins.
+    if (DruidRels.isScanOrMapping(leftDruidRel, true)) {
+      leftDataSource = leftQuery.getDataSource();
+    } else {
+      leftDataSource = new QueryDataSource(leftQuery.getQuery());
+    }
+
+    // Right rel: allow direct embedding of scans/mappings, excluding joins (those must be done as subqueries).
+    if (DruidRels.isScanOrMapping(rightDruidRel, false)) {
+      rightDataSource = rightQuery.getDataSource();
+    } else {
+      rightDataSource = new QueryDataSource(rightQuery.getQuery());
+    }
+
+    final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(leftSignature, rightSignature);
+
+    // Generate the condition for this join as a Druid expression.
+    final DruidExpression condition = Expressions.toDruidExpression(
+        getPlannerContext(),
+        prefixSignaturePair.rhs,
+        joinRel.getCondition()
+    );
+
+    // DruidJoinRule should not have created us if "condition" is null. Check defensively anyway, which also
+    // quiets static code analysis.
+    if (condition == null) {
+      throw new CannotBuildQueryException(joinRel, joinRel.getCondition());
+    }
+
+    return partialQuery.build(
+        JoinDataSource.create(
+            leftDataSource,
+            rightDataSource,
+            prefixSignaturePair.lhs,
+            condition.getExpression(),
+            toDruidJoinType(joinRel.getJoinType()),
+            getPlannerContext().getExprMacroTable()
+        ),
+        prefixSignaturePair.rhs,
+        getPlannerContext(),
+        getCluster().getRexBuilder(),
+        finalizeAggregations
+    );
+  }
+
+  @Override
+  public DruidQuery toDruidQueryForExplaining()
+  {
+    return partialQuery.build(
+        DUMMY_DATA_SOURCE,
+        RowSignature.from(
+            joinRel.getRowType().getFieldNames(),
+            joinRel.getRowType()
+        ),
+        getPlannerContext(),
+        getCluster().getRexBuilder(),
+        false
+    );
+  }
+
+  @Override
+  public DruidJoinQueryRel asDruidConvention()
+  {
+    return new DruidJoinQueryRel(
+        getCluster(),
+        getTraitSet().replace(DruidConvention.instance()),
+        joinRel.copy(
+            joinRel.getTraitSet(),
+            joinRel.getInputs()
+                   .stream()
+                   .map(input -> RelOptRule.convert(input, DruidConvention.instance()))
+                   .collect(Collectors.toList())
+        ),
+        partialQuery,
+        getQueryMaker()
+    );
+  }
+
+  @Override
+  public List<RelNode> getInputs()
+  {
+    return ImmutableList.of(left, right);
+  }
+
+  @Override
+  public void replaceInput(int ordinalInParent, RelNode p)
+  {
+    joinRel.replaceInput(ordinalInParent, p);
+
+    if (ordinalInParent == 0) {
+      this.left = p;
+    } else if (ordinalInParent == 1) {
+      this.right = p;
+    } else {
+      throw new IndexOutOfBoundsException(StringUtils.format("Invalid ordinalInParent[%s]", ordinalInParent));
+    }
+  }
+
+  @Override
+  public List<RexNode> getChildExps()
+  {
+    return ImmutableList.of(joinRel.getCondition());
+  }
+
+  @Override
+  public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs)
+  {
+    return new DruidJoinQueryRel(
+        getCluster(),
+        traitSet,
+        joinRel.copy(joinRel.getTraitSet(), inputs),
+        getPartialDruidQuery(),
+        getQueryMaker()
+    );
+  }
+
+  @Override
+  public Set<String> getDataSourceNames()
+  {
+    final Set<String> retVal = new HashSet<>();
+    retVal.addAll(((DruidRel<?>) left).getDataSourceNames());
+    retVal.addAll(((DruidRel<?>) right).getDataSourceNames());
+    return retVal;
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw)
+  {
+    final String queryString;
+    final DruidQuery druidQuery = toDruidQueryForExplaining();
+
+    try {
+      queryString = getQueryMaker().getJsonMapper().writeValueAsString(druidQuery.getQuery());
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+
+    return pw.input("left", left)
+             .input("right", right)
+             .item("condition", joinRel.getCondition())
+             .item("joinType", joinRel.getJoinType())
+             .item("query", queryString)
+             .item("signature", druidQuery.getOutputRowSignature());
+  }
+
+  @Override
+  protected RelDataType deriveRowType()
+  {
+    return partialQuery.getRowType();
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
+  {
+    return planner.getCostFactory()
+                  .makeCost(mq.getRowCount(left), 0, 0)
+                  .plus(planner.getCostFactory().makeCost(mq.getRowCount(right), 0, 0))
+                  .multiplyBy(COST_FACTOR);
+  }
+
+  private static JoinType toDruidJoinType(JoinRelType calciteJoinType)
+  {
+    switch (calciteJoinType) {
+      case LEFT:
+        return JoinType.LEFT;
+      case RIGHT:
+        return JoinType.RIGHT;
+      case FULL:
+        return JoinType.FULL;
+      case INNER:
+        return JoinType.INNER;
+      default:
+        throw new IAE("Cannot handle joinType[%s]", calciteJoinType);
+    }
+  }
+
+  /**
+   * Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
+   * applying that prefix.
+   */
+  private static Pair<String, RowSignature> computeJoinRowSignature(
+      final RowSignature leftSignature,
+      final RowSignature rightSignature
+  )
+  {
+    final RowSignature.Builder signatureBuilder = RowSignature.builder();
+
+    for (final String column : leftSignature.getRowOrder()) {
+      signatureBuilder.add(column, leftSignature.getColumnType(column));
+    }
+
+    // Need to include the "0" since findUnusedPrefixForDigits only guarantees safety for digit-initiated suffixes
+    final String rightPrefix = Calcites.findUnusedPrefixForDigits("j", leftSignature.getRowOrder()) + "0.";
+
+    for (final String column : rightSignature.getRowOrder()) {
+      signatureBuilder.add(rightPrefix + column, rightSignature.getColumnType(column));
+    }
+
+    return Pair.of(rightPrefix, signatureBuilder.build());
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
index 934361a..d80bca6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -44,7 +44,7 @@ import java.util.List;
 import java.util.Set;
 
 /**
- * DruidRel that uses a "query" dataSource.
+ * DruidRel that uses a {@link QueryDataSource}.
  */
 public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
 {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index b976960..92392cf 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -117,7 +117,32 @@ public class DruidQuery
   private final RelDataType outputRowType;
   private final VirtualColumnRegistry virtualColumnRegistry;
 
-  public DruidQuery(
+  private DruidQuery(
+      final DataSource dataSource,
+      final PlannerContext plannerContext,
+      @Nullable final DimFilter filter,
+      @Nullable final Projection selectProjection,
+      @Nullable final Grouping grouping,
+      @Nullable final Sorting sorting,
+      final RowSignature sourceRowSignature,
+      final RelDataType outputRowType,
+      final VirtualColumnRegistry virtualColumnRegistry
+  )
+  {
+    this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
+    this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext");
+    this.filter = filter;
+    this.selectProjection = selectProjection;
+    this.grouping = grouping;
+    this.sorting = sorting;
+    this.sourceRowSignature = Preconditions.checkNotNull(sourceRowSignature, "sourceRowSignature");
+    this.outputRowSignature = computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, sorting);
+    this.outputRowType = Preconditions.checkNotNull(outputRowType, "outputRowType");
+    this.virtualColumnRegistry = Preconditions.checkNotNull(virtualColumnRegistry, "virtualColumnRegistry");
+    this.query = computeQuery();
+  }
+
+  public static DruidQuery fromPartialQuery(
       final PartialDruidQuery partialQuery,
       final DataSource dataSource,
       final RowSignature sourceRowSignature,
@@ -126,15 +151,17 @@ public class DruidQuery
       final boolean finalizeAggregations
   )
   {
-    this.dataSource = dataSource;
-    this.outputRowType = partialQuery.leafRel().getRowType();
-    this.sourceRowSignature = sourceRowSignature;
-    this.virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
-    this.plannerContext = plannerContext;
+    final RelDataType outputRowType = partialQuery.leafRel().getRowType();
+    final VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
 
     // Now the fun begins.
+    final DimFilter filter;
+    final Projection selectProjection;
+    final Grouping grouping;
+    final Sorting sorting;
+
     if (partialQuery.getWhereFilter() != null) {
-      this.filter = Preconditions.checkNotNull(
+      filter = Preconditions.checkNotNull(
           computeWhereFilter(
               partialQuery,
               plannerContext,
@@ -143,55 +170,64 @@ public class DruidQuery
           )
       );
     } else {
-      this.filter = null;
+      filter = null;
     }
 
     // Only compute "selectProjection" if this is a non-aggregating query. (For aggregating queries, "grouping" will
     // reflect select-project from partialQuery on its own.)
     if (partialQuery.getSelectProject() != null && partialQuery.getAggregate() == null) {
-      this.selectProjection = Preconditions.checkNotNull(
+      selectProjection = Preconditions.checkNotNull(
           computeSelectProjection(
               partialQuery,
               plannerContext,
-              computeOutputRowSignature(),
+              computeOutputRowSignature(sourceRowSignature, null, null, null),
               virtualColumnRegistry
           )
       );
     } else {
-      this.selectProjection = null;
+      selectProjection = null;
     }
 
     if (partialQuery.getAggregate() != null) {
-      this.grouping = Preconditions.checkNotNull(
+      grouping = Preconditions.checkNotNull(
           computeGrouping(
               partialQuery,
               plannerContext,
-              computeOutputRowSignature(),
+              computeOutputRowSignature(sourceRowSignature, selectProjection, null, null),
               virtualColumnRegistry,
               rexBuilder,
               finalizeAggregations
           )
       );
     } else {
-      this.grouping = null;
+      grouping = null;
     }
 
     if (partialQuery.getSort() != null) {
-      this.sorting = Preconditions.checkNotNull(
+      sorting = Preconditions.checkNotNull(
           computeSorting(
               partialQuery,
               plannerContext,
-              computeOutputRowSignature(),
+              computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, null),
               // When sorting follows grouping, virtual columns cannot be used
               partialQuery.getAggregate() != null ? null : virtualColumnRegistry
           )
       );
     } else {
-      this.sorting = null;
+      sorting = null;
     }
 
-    this.outputRowSignature = computeOutputRowSignature();
-    this.query = computeQuery();
+    return new DruidQuery(
+        dataSource,
+        plannerContext,
+        filter,
+        selectProjection,
+        grouping,
+        sorting,
+        sourceRowSignature,
+        outputRowType,
+        virtualColumnRegistry
+    );
   }
 
   @Nonnull
@@ -357,7 +393,7 @@ public class DruidQuery
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<DimensionExpression> dimensions = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(rowSignature.getRowOrder()));
+    final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("d", new TreeSet<>(rowSignature.getRowOrder()));
     int outputNameCounter = 0;
 
     for (int i : aggregate.getGroupSet()) {
@@ -426,7 +462,7 @@ public class DruidQuery
   {
     final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
     final List<Aggregation> aggregations = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(rowSignature.getRowOrder()));
+    final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("a", new TreeSet<>(rowSignature.getRowOrder()));
 
     for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
       final String aggName = outputNamePrefix + i;
@@ -525,6 +561,29 @@ public class DruidQuery
     return Sorting.create(orderBys, limit, projection);
   }
 
+  /**
+   * Return the {@link RowSignature} corresponding to the output of a query with the given parameters.
+   */
+  private static RowSignature computeOutputRowSignature(
+      final RowSignature sourceRowSignature,
+      @Nullable final Projection selectProjection,
+      @Nullable final Grouping grouping,
+      @Nullable final Sorting sorting
+  )
+  {
+    if (sorting != null && sorting.getProjection() != null) {
+      return sorting.getProjection().getOutputRowSignature();
+    } else if (grouping != null) {
+      // Sanity check: cannot have both "grouping" and "selectProjection".
+      Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
+      return grouping.getOutputRowSignature();
+    } else if (selectProjection != null) {
+      return selectProjection.getOutputRowSignature();
+    } else {
+      return sourceRowSignature;
+    }
+  }
+
   private VirtualColumns getVirtualColumns(final boolean includeDimensions)
   {
     // 'sourceRowSignature' could provide a list of all defined virtual columns while constructing a query, but we
@@ -570,6 +629,11 @@ public class DruidQuery
     return VirtualColumns.create(columns);
   }
 
+  public DataSource getDataSource()
+  {
+    return dataSource;
+  }
+
   @Nullable
   public Grouping getGrouping()
   {
@@ -592,26 +656,6 @@ public class DruidQuery
   }
 
   /**
-   * Return the {@link RowSignature} corresponding to the output of this query. This method may be called during
-   * construction, in which case it returns the output row signature at whatever phase of construction this method
-   * is called at. At the end of construction, the final result is assigned to {@link #outputRowSignature}.
-   */
-  private RowSignature computeOutputRowSignature()
-  {
-    if (sorting != null && sorting.getProjection() != null) {
-      return sorting.getProjection().getOutputRowSignature();
-    } else if (grouping != null) {
-      // Sanity check: cannot have both "grouping" and "selectProjection".
-      Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
-      return grouping.getOutputRowSignature();
-    } else if (selectProjection != null) {
-      return selectProjection.getOutputRowSignature();
-    } else {
-      return sourceRowSignature;
-    }
-  }
-
-  /**
    * Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery},
    * {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery}
    *
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
index fed6d88..a6fa53c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
@@ -38,7 +38,7 @@ import javax.annotation.Nonnull;
 import java.util.Set;
 
 /**
- * DruidRel that uses a "table" dataSource.
+ * DruidRel that operates on top of a {@link DruidTable} directly (no joining or subqueries).
  */
 public class DruidQueryRel extends DruidRel<DruidQueryRel>
 {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java
new file mode 100644
index 0000000..2ca30d8
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import org.apache.druid.query.DataSource;
+import org.apache.druid.sql.calcite.table.DruidTable;
+
+import java.util.Optional;
+
+public class DruidRels
+{
+  /**
+   * Returns the DataSource involved in a leaf query of class {@link DruidQueryRel}.
+   */
+  public static Optional<DataSource> dataSourceIfLeafRel(final DruidRel<?> druidRel)
+  {
+    if (druidRel instanceof DruidQueryRel) {
+      return Optional.of(druidRel.getTable().unwrap(DruidTable.class).getDataSource());
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Check if a druidRel is a simple table scan, or a projection that merely remaps columns without transforming them.
+   * Like {@link #isScanOrProject} but more restrictive: only remappings are allowed.
+   *
+   * @param druidRel  the rel to check
+   * @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
+   */
+  public static boolean isScanOrMapping(final DruidRel<?> druidRel, final boolean canBeJoin)
+  {
+    if (isScanOrProject(druidRel, canBeJoin)) {
+      // Like isScanOrProject, but don't allow transforming projections.
+      final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
+      return partialQuery.getSelectProject() == null || partialQuery.getSelectProject().isMapping();
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check if a druidRel is a simple table scan or a scan + projection.
+   *
+   * @param druidRel  the rel to check
+   * @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
+   */
+  private static boolean isScanOrProject(final DruidRel<?> druidRel, final boolean canBeJoin)
+  {
+    if (druidRel instanceof DruidQueryRel || (canBeJoin && druidRel instanceof DruidJoinQueryRel)) {
+      final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
+      final PartialDruidQuery.Stage stage = partialQuery.stage();
+      return (stage == PartialDruidQuery.Stage.SCAN || stage == PartialDruidQuery.Stage.SELECT_PROJECT)
+             && partialQuery.getWhereFilter() == null;
+    } else {
+      return false;
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
index e9473c6..7738c92 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -103,7 +103,7 @@ public class PartialDruidQuery
   {
     final Supplier<RelBuilder> builderSupplier = () -> RelFactories.LOGICAL_BUILDER.create(
         scanRel.getCluster(),
-        scanRel.getTable().getRelOptSchema()
+        scanRel.getTable() != null ? scanRel.getTable().getRelOptSchema() : null
     );
     return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null);
   }
@@ -303,7 +303,14 @@ public class PartialDruidQuery
       final boolean finalizeAggregations
   )
   {
-    return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
+    return DruidQuery.fromPartialQuery(
+        this,
+        dataSource,
+        sourceRowSignature,
+        plannerContext,
+        rexBuilder,
+        finalizeAggregations
+    );
   }
 
   public boolean canAccept(final Stage stage)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
index 41b2f41..16b6792 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java
@@ -197,7 +197,7 @@ public class Projection
   )
   {
     final List<String> rowOrder = new ArrayList<>();
-    final String outputNamePrefix = Calcites.findUnusedPrefix(
+    final String outputNamePrefix = Calcites.findUnusedPrefixForDigits(
         basePrefix,
         new TreeSet<>(inputRowSignature.getRowOrder())
     );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
index dc35391..a4a0d25 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/VirtualColumnRegistry.java
@@ -60,7 +60,7 @@ public class VirtualColumnRegistry
   {
     return new VirtualColumnRegistry(
         rowSignature,
-        Calcites.findUnusedPrefix("v", new TreeSet<>(rowSignature.getRowOrder())),
+        Calcites.findUnusedPrefixForDigits("v", new TreeSet<>(rowSignature.getRowOrder())),
         new HashMap<>(),
         new HashMap<>()
     );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
new file mode 100644
index 0000000..a7ba42e
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rule;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
+import org.apache.druid.sql.calcite.rel.DruidRel;
+import org.apache.druid.sql.calcite.rel.DruidRels;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+public class DruidJoinRule extends RelOptRule
+{
+  private static final DruidJoinRule INSTANCE = new DruidJoinRule();
+
+  private DruidJoinRule()
+  {
+    super(
+        operand(
+            Join.class,
+            operand(DruidRel.class, none()),
+            operand(DruidRel.class, none())
+        )
+    );
+  }
+
+  public static DruidJoinRule instance()
+  {
+    return INSTANCE;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call)
+  {
+    final Join join = call.rel(0);
+    final DruidRel<?> left = call.rel(1);
+    final DruidRel<?> right = call.rel(2);
+
+    // 1) Condition must be handleable.
+    // 2) Left must be a scan or a join.
+    // 3) If left is not a join, it must be concrete.
+    // 4) Right must be a scan (and *cannot* be a join; we want to generate left-heavy trees).
+    // 5) Right must be global.
+    return
+        canHandleCondition(join.getCondition(), join.getLeft().getRowType())
+        && DruidRels.isScanOrMapping(left, true)
+        && DruidRels.isScanOrMapping(right, false)
+        && (left instanceof DruidJoinQueryRel
+            || DruidRels.dataSourceIfLeafRel(left).filter(DataSource::isConcrete).isPresent())
+        && DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent();
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call)
+  {
+    final Join join = call.rel(0);
+    final DruidRel<?> left = call.rel(1);
+
+    // Preconditions were already verified in "matches".
+    call.transformTo(
+        DruidJoinQueryRel.create(join, left.getQueryMaker())
+    );
+  }
+
+  /**
+   * Returns true if this condition is an AND of equality conditions of the form: f(LeftRel) = RightColumn.
+   *
+   * @see org.apache.druid.segment.join.JoinConditionAnalysis where "equiCondition" is the same concept.
+   */
+  @VisibleForTesting
+  static boolean canHandleCondition(final RexNode condition, final RelDataType leftRowType)
+  {
+    final List<RexNode> subConditions = decomposeAnd(condition);
+
+    for (RexNode subCondition : subConditions) {
+      if (subCondition.isA(SqlKind.LITERAL)) {
+        // Literals are always OK.
+        continue;
+      }
+
+      if (!subCondition.isA(SqlKind.EQUALS)) {
+        // If it's not EQUALS, it's not supported.
+        return false;
+      }
+
+      final List<RexNode> operands = ((RexCall) subCondition).getOperands();
+      Preconditions.checkState(operands.size() == 2, "Expected 2 operands, got[%,d]", operands.size());
+
+      final int numLeftFields = leftRowType.getFieldList().size();
+
+      final boolean rhsIsFieldOfRightRel =
+          operands.get(1).isA(SqlKind.INPUT_REF)
+          && ((RexInputRef) operands.get(1)).getIndex() >= numLeftFields;
+
+      final boolean lhsIsExpressionOfLeftRel =
+          RelOptUtil.InputFinder.bits(operands.get(0)).intersects(ImmutableBitSet.range(numLeftFields));
+
+      if (!(lhsIsExpressionOfLeftRel && rhsIsFieldOfRightRel)) {
+        // Cannot handle this condition.
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @VisibleForTesting
+  static List<RexNode> decomposeAnd(final RexNode condition)
+  {
+    final List<RexNode> retVal = new ArrayList<>();
+    final Stack<RexNode> stack = new Stack<>();
+
+    stack.push(condition);
+
+    while (!stack.empty()) {
+      final RexNode current = stack.pop();
+
+      if (current.isA(SqlKind.AND)) {
+        final List<RexNode> operands = ((RexCall) current).getOperands();
+
+        // Add right-to-left, so when we unwind the stack, the operands are in the original order.
+        for (int i = operands.size() - 1; i >= 0; i--) {
+          stack.push(operands.get(i));
+        }
+      } else {
+        retVal.add(current);
+      }
+    }
+
+    return retVal;
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
index 21998f1..75f5038 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
@@ -90,7 +90,8 @@ public class DruidRules
         DruidOuterQueryRule.PROJECT_AGGREGATE,
         DruidOuterQueryRule.AGGREGATE_SORT_PROJECT,
         DruidUnionRule.instance(),
-        DruidSortUnionRule.instance()
+        DruidSortUnionRule.instance(),
+        DruidJoinRule.instance()
     );
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
new file mode 100644
index 0000000..0fd414b
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory;
+import org.apache.druid.sql.calcite.table.DruidTable;
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import java.util.Map;
+
+/**
+ * Creates the "lookup" schema in Druid SQL, composed of all available {@link LookupDataSource}.
+ */
+public class LookupSchema extends AbstractSchema
+{
+  public static final String NAME = "lookup";
+
+  private static final RowSignature ROW_SIGNATURE =
+      RowSignature.builder()
+                  .add(LookupColumnSelectorFactory.KEY_COLUMN, ValueType.STRING)
+                  .add(LookupColumnSelectorFactory.VALUE_COLUMN, ValueType.STRING)
+                  .build();
+
+  private final LookupExtractorFactoryContainerProvider lookupProvider;
+
+  @Inject
+  public LookupSchema(final LookupExtractorFactoryContainerProvider lookupProvider)
+  {
+    this.lookupProvider = lookupProvider;
+  }
+
+  @Override
+  protected Map<String, Table> getTableMap()
+  {
+    final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
+
+    for (final String lookupName : lookupProvider.getAllLookupNames()) {
+      tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE));
+    }
+
+    return tableMapBuilder.build();
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 5d31868..0aee606 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -187,6 +187,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
     testRequestLogger = new TestRequestLogger();
     final PlannerFactory plannerFactory = new PlannerFactory(
         druidSchema,
+        CalciteTests.createMockLookupSchema(),
         systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
@@ -827,6 +828,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
         CalciteTests.createSqlLifecycleFactory(
           new PlannerFactory(
               druidSchema,
+              CalciteTests.createMockLookupSchema(),
               systemSchema,
               CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
               operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index 9cc2ecf..aad9e2d 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -93,6 +93,7 @@ public class DruidStatementTest extends CalciteTestBase
     final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
     final PlannerFactory plannerFactory = new PlannerFactory(
         druidSchema,
+        CalciteTests.createMockLookupSchema(),
         systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index e82fcae..446f946 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -32,7 +32,9 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.JoinDataSource;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
@@ -58,11 +60,13 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -94,10 +98,13 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class BaseCalciteQueryTest extends CalciteTestBase
 {
-  public static final String NULL_VALUE = NullHandling.replaceWithDefault() ? "" : null;
+  public static final String NULL_STRING = NullHandling.defaultStringValue();
+  public static final Float NULL_FLOAT = NullHandling.defaultFloatValue();
+  public static final Long NULL_LONG = NullHandling.defaultLongValue();
   public static final String HLLC_STRING = VersionOneHyperLogLogCollector.class.getName();
 
   public static final Logger log = new Logger(BaseCalciteQueryTest.class);
@@ -329,7 +336,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
         false,
         true,
         null,
-        null, 
+        null,
         StringComparators.NUMERIC
     );
   }
@@ -363,6 +370,29 @@ public class BaseCalciteQueryTest extends CalciteTestBase
     return new ExpressionVirtualColumn(name, expression, outputType, CalciteTests.createExprMacroTable());
   }
 
+  public static JoinDataSource join(
+      DataSource left,
+      DataSource right,
+      String rightPrefix,
+      String condition,
+      JoinType joinType
+  )
+  {
+    return JoinDataSource.create(
+        left,
+        right,
+        rightPrefix,
+        condition,
+        joinType,
+        CalciteTests.createExprMacroTable()
+    );
+  }
+
+  public static String equalsCondition(DruidExpression left, DruidExpression right)
+  {
+    return StringUtils.format("(%s == %s)", left.getExpression(), right.getExpression());
+  }
+
   public static ExpressionPostAggregator expressionPostAgg(final String name, final String expression)
   {
     return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
@@ -371,7 +401,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
   public static Druids.ScanQueryBuilder newScanQueryBuilder()
   {
     return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                                           .legacy(false);
+                                        .legacy(false);
   }
 
   @BeforeClass
@@ -501,17 +531,31 @@ public class BaseCalciteQueryTest extends CalciteTestBase
     testQuery(plannerConfig, QUERY_CONTEXT_DEFAULT, sql, authenticationResult, expectedQueries, expectedResults);
   }
 
-  private Query recursivelyOverrideContext(final Query q, final Map<String, Object> context)
+  /**
+   * Override not just the outer query context, but also the contexts of all subqueries.
+   */
+  private <T> Query<T> recursivelyOverrideContext(final Query<T> query, final Map<String, Object> context)
   {
-    final Query q2;
-    if (q.getDataSource() instanceof QueryDataSource) {
-      final Query subQuery = ((QueryDataSource) q.getDataSource()).getQuery();
-      q2 = q.withDataSource(new QueryDataSource(recursivelyOverrideContext(subQuery, context)));
+    return query.withDataSource(recursivelyOverrideContext(query.getDataSource(), context))
+                .withOverriddenContext(context);
+  }
+
+  /**
+   * Override the contexts of all subqueries of a particular datasource.
+   */
+  private DataSource recursivelyOverrideContext(final DataSource dataSource, final Map<String, Object> context)
+  {
+    if (dataSource instanceof QueryDataSource) {
+      final Query subquery = ((QueryDataSource) dataSource).getQuery();
+      return new QueryDataSource(recursivelyOverrideContext(subquery, context));
     } else {
-      q2 = q;
+      return dataSource.withChildren(
+          dataSource.getChildren()
+                    .stream()
+                    .map(ds -> recursivelyOverrideContext(ds, context))
+                    .collect(Collectors.toList())
+      );
     }
-
-    return q2.withOverriddenContext(context);
   }
 
   public void testQuery(
@@ -595,6 +639,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
 
     final PlannerFactory plannerFactory = new PlannerFactory(
         druidSchema,
+        CalciteTests.createMockLookupSchema(),
         systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 74e033d..3301b87 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -32,8 +32,10 @@ import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -82,6 +84,7 @@ import org.apache.druid.query.topn.InvertedTopNMetricSpec;
 import org.apache.druid.query.topn.NumericTopNMetricSpec;
 import org.apache.druid.query.topn.TopNQueryBuilder;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.Calcites;
@@ -318,6 +321,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         "SELECT DISTINCT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA",
         ImmutableList.of(),
         ImmutableList.of(
+            new Object[]{"lookup"},
             new Object[]{"druid"},
             new Object[]{"sys"},
             new Object[]{"INFORMATION_SCHEMA"}
@@ -344,6 +348,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
             .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
             .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
             .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
+            .add(new Object[]{"lookup", "lookyloo", "TABLE"})
             .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
             .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
             .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
@@ -371,6 +376,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
             .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
             .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
             .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
+            .add(new Object[]{"lookup", "lookyloo", "TABLE"})
             .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
             .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
             .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
@@ -488,7 +494,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   @Test
   public void testSelectStar() throws Exception
   {
-    String hyperLogLogCollectorClassName = HLLC_STRING;
     testQuery(
         PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
         QUERY_CONTEXT_DEFAULT,
@@ -504,23 +509,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, hyperLogLogCollectorClassName},
-            new Object[]{
-                timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2f, 2.0, hyperLogLogCollectorClassName
-            },
-            new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, hyperLogLogCollectorClassName},
-            new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, hyperLogLogCollectorClassName},
-            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, hyperLogLogCollectorClassName},
-            new Object[]{
-                timestamp("2001-01-03"),
-                1L,
-                "abc",
-                NULL_VALUE,
-                NULL_VALUE,
-                6f,
-                6.0,
-                hyperLogLogCollectorClassName
-            }
+            new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING},
+            new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
+            new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING},
+            new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, HLLC_STRING},
+            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5.0, HLLC_STRING},
+            new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6.0, HLLC_STRING}
         )
     );
   }
@@ -617,7 +611,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         ),
         ImmutableList.of(
             new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1.0f, 1.0, HLLC_STRING},
-            new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}
+            new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}
         )
     );
   }
@@ -642,7 +636,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         ),
         ImmutableList.of(
             new Object[]{"a"},
-            new Object[]{NULL_VALUE}
+            new Object[]{NULL_STRING}
         )
     );
   }
@@ -691,8 +685,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6d, HLLC_STRING},
-            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5d, HLLC_STRING}
+            new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6d, HLLC_STRING},
+            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5d, HLLC_STRING}
         )
     );
   }
@@ -718,11 +712,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         ),
         ImmutableList.of(
             new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING},
-            new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
+            new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
             new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING},
             new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, HLLC_STRING},
-            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, HLLC_STRING},
-            new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6.0, HLLC_STRING}
+            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5.0, HLLC_STRING},
+            new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6.0, HLLC_STRING}
         )
     );
   }
@@ -744,7 +738,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         ),
         ImmutableList.of(
             new Object[]{"a", "a"},
-            new Object[]{NULL_VALUE, NULL_VALUE}
+            new Object[]{NULL_STRING, NULL_STRING}
         )
     );
   }
@@ -2612,10 +2606,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                         .build()
         ),
         ImmutableList.of(
-            new Object[]{"", NULL_VALUE},
-            new Object[]{"1", NULL_VALUE},
+            new Object[]{"", NULL_STRING},
+            new Object[]{"1", NULL_STRING},
             new Object[]{"10.1", "0.1"},
-            new Object[]{"2", NULL_VALUE},
+            new Object[]{"2", NULL_STRING},
             new Object[]{"abc", "bc"},
             new Object[]{"def", "ef"}
         )
@@ -2664,9 +2658,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
             new Object[]{"10.1", "0.1"},
             new Object[]{"abc", "bc"},
             new Object[]{"def", "ef"},
-            new Object[]{"1", NULL_VALUE},
-            new Object[]{"2", NULL_VALUE},
-            new Object[]{"", NULL_VALUE}
+            new Object[]{"1", NULL_STRING},
+            new Object[]{"2", NULL_STRING},
+            new Object[]{"", NULL_STRING}
         )
     );
   }
@@ -2694,10 +2688,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", NULL_VALUE},
-            new Object[]{"1", NULL_VALUE},
+            new Object[]{"", NULL_STRING},
+            new Object[]{"1", NULL_STRING},
             new Object[]{"10.1", "0.1"},
-            new Object[]{"2", NULL_VALUE},
+            new Object[]{"2", NULL_STRING},
             new Object[]{"abc", "bc"},
             new Object[]{"def", "ef"}
         )
@@ -2735,9 +2729,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
             new Object[]{"10.1", "0.1"},
             new Object[]{"abc", "bc"},
             new Object[]{"def", "ef"},
-            new Object[]{"1", NULL_VALUE},
-            new Object[]{"2", NULL_VALUE},
-            new Object[]{"", NULL_VALUE}
+            new Object[]{"1", NULL_STRING},
+            new Object[]{"2", NULL_STRING},
+            new Object[]{"", NULL_STRING}
         )
     );
   }
@@ -3452,7 +3446,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         ImmutableList.of(
             new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1.0f, 1.0d, HLLC_STRING},
             new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4.0f, 4.0d, HLLC_STRING},
-            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5.0f, 5.0d, HLLC_STRING}
+            new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5.0f, 5.0d, HLLC_STRING}
         )
     );
   }
@@ -7602,6 +7596,416 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
+  public void testFilterAndGroupByLookupUsingJoinOperatorAllowNulls() throws Exception
+  {
+    // Cannot vectorize JOIN operator.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT lookyloo.v, COUNT(*)\n"
+        + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+        + "WHERE lookyloo.v <> 'xa' OR lookyloo.v IS NULL\n"
+        + "GROUP BY lookyloo.v",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            join(
+                                new TableDataSource(CalciteTests.DATASOURCE1),
+                                new LookupDataSource("lookyloo"),
+                                "j0.",
+                                equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+                                JoinType.LEFT
+                            )
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setDimFilter(or(not(selector("j0.v", "xa", null)), selector("j0.v", null, null)))
+                        .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+                        .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NULL_STRING, 3L},
+            new Object[]{"xabc", 1L}
+        )
+    );
+  }
+
+  @Test
+  public void testFilterAndGroupByLookupUsingJoinOperator() throws Exception
+  {
+    // Cannot vectorize JOIN operator.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT lookyloo.v, COUNT(*)\n"
+        + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+        + "WHERE lookyloo.v <> 'xa'\n"
+        + "GROUP BY lookyloo.v",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            join(
+                                new TableDataSource(CalciteTests.DATASOURCE1),
+                                new LookupDataSource("lookyloo"),
+                                "j0.",
+                                equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+                                JoinType.LEFT
+                            )
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setDimFilter(not(selector("j0.v", "xa", null)))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+                        .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NULL_STRING, 3L},
+            new Object[]{"xabc", 1L}
+        )
+    );
+  }
+
+  @Test
+  public void testFilterAndGroupByLookupUsingJoinOperatorBackwards() throws Exception
+  {
+    // Cannot vectorize JOIN operator.
+    cannotVectorize();
+
+    // Like "testFilterAndGroupByLookupUsingJoinOperator", but with the table and lookup reversed.
+    testQuery(
+        "SELECT lookyloo.v, COUNT(*)\n"
+        + "FROM lookup.lookyloo RIGHT JOIN foo ON foo.dim2 = lookyloo.k\n"
+        + "WHERE lookyloo.v <> 'xa'\n"
+        + "GROUP BY lookyloo.v",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            join(
+                                new TableDataSource(CalciteTests.DATASOURCE1),
+                                new LookupDataSource("lookyloo"),
+                                "j0.",
+                                equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+                                JoinType.LEFT
+                            )
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setDimFilter(not(selector("j0.v", "xa", null)))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+                        .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NULL_STRING, 3L},
+            new Object[]{"xabc", 1L}
+        )
+    );
+  }
+
+  @Test
+  public void testGroupByInnerJoinOnLookupUsingJoinOperator() throws Exception
+  {
+    // Cannot vectorize JOIN operator.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT lookyloo.v, COUNT(*)\n"
+        + "FROM foo INNER JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+        + "GROUP BY lookyloo.v",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            join(
+                                new TableDataSource(CalciteTests.DATASOURCE1),
+                                new LookupDataSource("lookyloo"),
+                                "j0.",
+                                equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                                JoinType.INNER
+                            )
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
+                        .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"xabc", 1L}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectOnLookupUsingInnerJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim2, lookyloo.*\n"
+        + "FROM foo INNER JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        new TableDataSource(CalciteTests.DATASOURCE1),
+                        new LookupDataSource("lookyloo"),
+                        "j0.",
+                        equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+                        JoinType.INNER
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("dim2", "j0.k", "j0.v")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a", "a", "xa"},
+            new Object[]{"a", "a", "xa"},
+            new Object[]{"abc", "abc", "xabc"}
+        )
+    );
+  }
+
+  @Test
+  public void testLeftJoinTwoLookupsUsingJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim1, dim2, l1.v, l2.v\n"
+        + "FROM foo\n"
+        + "LEFT JOIN lookup.lookyloo l1 ON foo.dim1 = l1.k\n"
+        + "LEFT JOIN lookup.lookyloo l2 ON foo.dim2 = l2.k\n",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        join(
+                            new TableDataSource(CalciteTests.DATASOURCE1),
+                            new LookupDataSource("lookyloo"),
+                            "j0.",
+                            equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                            JoinType.LEFT
+                        ),
+                        new LookupDataSource("lookyloo"),
+                        "_j0.",
+                        equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("_j0.k")),
+                        JoinType.LEFT
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("_j0.v", "dim1", "dim2", "j0.v")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"", "a", NULL_STRING, "xa"},
+            new Object[]{"10.1", NULL_STRING, NULL_STRING, NULL_STRING},
+            new Object[]{"2", "", NULL_STRING, NULL_STRING},
+            new Object[]{"1", "a", NULL_STRING, "xa"},
+            new Object[]{"def", "abc", NULL_STRING, "xabc"},
+            new Object[]{"abc", NULL_STRING, "xabc", NULL_STRING}
+        )
+    );
+  }
+
+  @Test
+  public void testLeftJoinLookupOntoLookupUsingJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim2, l1.v, l2.v\n"
+        + "FROM foo\n"
+        + "LEFT JOIN lookup.lookyloo l1 ON foo.dim2 = l1.k\n"
+        + "LEFT JOIN lookup.lookyloo l2 ON l1.k = l2.k",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        join(
+                            new TableDataSource(CalciteTests.DATASOURCE1),
+                            new LookupDataSource("lookyloo"),
+                            "j0.",
+                            equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
+                            JoinType.LEFT
+                        ),
+                        new LookupDataSource("lookyloo"),
+                        "_j0.",
+                        equalsCondition(DruidExpression.fromColumn("j0.k"), DruidExpression.fromColumn("_j0.k")),
+                        JoinType.LEFT
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("_j0.v", "dim2", "j0.v")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a", "xa", "xa"},
+            new Object[]{NULL_STRING, NULL_STRING, NULL_STRING},
+            new Object[]{"", NULL_STRING, NULL_STRING},
+            new Object[]{"a", "xa", "xa"},
+            new Object[]{"abc", "xabc", "xabc"},
+            new Object[]{NULL_STRING, NULL_STRING, NULL_STRING}
+        )
+    );
+  }
+
+  @Test
+  public void testLeftJoinThreeLookupsUsingJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim1, dim2, l1.v, l2.v, l3.v\n"
+        + "FROM foo\n"
+        + "LEFT JOIN lookup.lookyloo l1 ON foo.dim1 = l1.k\n"
+        + "LEFT JOIN lookup.lookyloo l2 ON foo.dim2 = l2.k\n"
+        + "LEFT JOIN lookup.lookyloo l3 ON l2.k = l3.k",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        join(
+                            join(
+                                new TableDataSource(CalciteTests.DATASOURCE1),
+                                new LookupDataSource("lookyloo"),
+                                "j0.",
+                                equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                                JoinType.LEFT
+                            ),
+                            new LookupDataSource("lookyloo"),
+                            "_j0.",
+                            equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("_j0.k")),
+                            JoinType.LEFT
+                        ),
+                        new LookupDataSource("lookyloo"),
+                        "__j0.",
+                        equalsCondition(DruidExpression.fromColumn("_j0.k"), DruidExpression.fromColumn("__j0.k")),
+                        JoinType.LEFT
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__j0.v", "_j0.v", "dim1", "dim2", "j0.v")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"", "a", NULL_STRING, "xa", "xa"},
+            new Object[]{"10.1", NULL_STRING, NULL_STRING, NULL_STRING, NULL_STRING},
+            new Object[]{"2", "", NULL_STRING, NULL_STRING, NULL_STRING},
+            new Object[]{"1", "a", NULL_STRING, "xa", "xa"},
+            new Object[]{"def", "abc", NULL_STRING, "xabc", "xabc"},
+            new Object[]{"abc", NULL_STRING, "xabc", NULL_STRING, NULL_STRING}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectOnLookupUsingLeftJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim1, lookyloo.*\n"
+        + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+        + "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        new TableDataSource(CalciteTests.DATASOURCE1),
+                        new LookupDataSource("lookyloo"),
+                        "j0.",
+                        equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                        JoinType.LEFT
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
+                .columns("dim1", "j0.k", "j0.v")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"", NULL_STRING, NULL_STRING},
+            new Object[]{"10.1", NULL_STRING, NULL_STRING},
+            new Object[]{"2", NULL_STRING, NULL_STRING},
+            new Object[]{"1", NULL_STRING, NULL_STRING},
+            new Object[]{"def", NULL_STRING, NULL_STRING},
+            new Object[]{"abc", "abc", "xabc"}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectOnLookupUsingRightJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim1, lookyloo.*\n"
+        + "FROM foo RIGHT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+        + "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        new TableDataSource(CalciteTests.DATASOURCE1),
+                        new LookupDataSource("lookyloo"),
+                        "j0.",
+                        equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                        JoinType.RIGHT
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
+                .columns("dim1", "j0.k", "j0.v")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"abc", "abc", "xabc"},
+            new Object[]{NULL_STRING, "a", "xa"},
+            new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectOnLookupUsingFullJoinOperator() throws Exception
+  {
+    testQuery(
+        "SELECT dim1, m1, cnt, lookyloo.*\n"
+        + "FROM foo FULL JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+        + "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    join(
+                        new TableDataSource(CalciteTests.DATASOURCE1),
+                        new LookupDataSource("lookyloo"),
+                        "j0.",
+                        equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                        JoinType.FULL
+                    )
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
+                .columns("cnt", "dim1", "j0.k", "j0.v", "m1")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"", 1f, 1L, NULL_STRING, NULL_STRING},
+            new Object[]{"10.1", 2f, 1L, NULL_STRING, NULL_STRING},
+            new Object[]{"2", 3f, 1L, NULL_STRING, NULL_STRING},
+            new Object[]{"1", 4f, 1L, NULL_STRING, NULL_STRING},
+            new Object[]{"def", 5f, 1L, NULL_STRING, NULL_STRING},
+            new Object[]{"abc", 6f, 1L, "abc", "xabc"},
+            new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "a", "xa"},
+            new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"}
+        )
+    );
+  }
+
+  @Test
   public void testCountDistinctOfLookup() throws Exception
   {
     // Cannot vectorize due to "cardinality" aggregator.
@@ -7642,6 +8046,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
+  public void testCountDistinctOfLookupUsingJoinOperator() throws Exception
+  {
+    // Cannot yet vectorize the JOIN operator.
+    cannotVectorize();
+
+    testQuery(
+        "SELECT COUNT(DISTINCT lookyloo.v)\n"
+        + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(
+                      join(
+                          new TableDataSource(CalciteTests.DATASOURCE1),
+                          new LookupDataSource("lookyloo"),
+                          "j0.",
+                          equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
+                          JoinType.LEFT
+                      )
+                  )
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(aggregators(
+                      new CardinalityAggregatorFactory(
+                          "a0",
+                          null,
+                          ImmutableList.of(DefaultDimensionSpec.of("j0.v")),
+                          false,
+                          true
+                      )
+                  ))
+                  .context(TIMESERIES_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{NullHandling.replaceWithDefault() ? 2L : 1L}
+        )
+    );
+  }
+
+  @Test
   public void testTimeseries() throws Exception
   {
     testQuery(
@@ -10648,8 +11092,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         )
     );
   }
-  
-  
+
+
 
   @Test
   public void testQueryContextOuterLimit() throws Exception
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
index d525fff..20d77bc 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
@@ -147,6 +147,7 @@ public class SqlResourceTest extends CalciteTestBase
 
     final PlannerFactory plannerFactory = new PlannerFactory(
         druidSchema,
+        CalciteTests.createMockLookupSchema(),
         systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
index 763524e..676c0b7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitesTest.java
@@ -42,14 +42,14 @@ public class CalcitesTest extends CalciteTestBase
   @Test
   public void testFindUnusedPrefix()
   {
-    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
-    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
-    Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
-    Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
-    Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
-    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
-    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
-    Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
-    Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x")));
+    Assert.assertEquals("_x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x0")));
+    Assert.assertEquals("_x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x4")));
+    Assert.assertEquals("__x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "_xbxx")));
+    Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x")));
+    Assert.assertEquals("__x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
   }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java
new file mode 100644
index 0000000..7e0ed30
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/rel/DruidRelsTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class DruidRelsTest
+{
+  @Test
+  public void test_isScanOrMapping_scan()
+  {
+    final DruidRel<?> rel = mockDruidRel(DruidQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
+    Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertTrue(DruidRels.isScanOrMapping(rel, false));
+    EasyMock.verify(rel, rel.getPartialDruidQuery());
+  }
+
+  @Test
+  public void test_isScanOrMapping_scanJoin()
+  {
+    final DruidRel<?> rel = mockDruidRel(DruidJoinQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
+    Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+    EasyMock.verify(rel, rel.getPartialDruidQuery());
+  }
+
+  @Test
+  public void test_isScanOrMapping_scanQuery()
+  {
+    final DruidRel<?> rel = mockDruidRel(DruidOuterQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+    EasyMock.verify(rel, rel.getPartialDruidQuery());
+  }
+
+  @Test
+  public void test_isScanOrMapping_mapping()
+  {
+    final Project project = mockProject(true);
+    final DruidRel<?> rel = mockDruidRel(
+        DruidQueryRel.class,
+        PartialDruidQuery.Stage.SELECT_PROJECT,
+        project,
+        null
+    );
+    Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertTrue(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+  }
+
+  @Test
+  public void test_isScanOrMapping_mappingJoin()
+  {
+    final Project project = mockProject(true);
+    final DruidRel<?> rel = mockDruidRel(
+        DruidJoinQueryRel.class,
+        PartialDruidQuery.Stage.SELECT_PROJECT,
+        project,
+        null
+    );
+    Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+  }
+
+  @Test
+  public void test_isScanOrMapping_nonMapping()
+  {
+    final Project project = mockProject(false);
+    final DruidRel<?> rel = mockDruidRel(
+        DruidQueryRel.class,
+        PartialDruidQuery.Stage.SELECT_PROJECT,
+        project,
+        null
+    );
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+  }
+
+  @Test
+  public void test_isScanOrMapping_nonMappingJoin()
+  {
+    final Project project = mockProject(false);
+    final DruidRel<?> rel = mockDruidRel(
+        DruidJoinQueryRel.class,
+        PartialDruidQuery.Stage.SELECT_PROJECT,
+        project,
+        null
+    );
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+  }
+
+  @Test
+  public void test_isScanOrMapping_filterThenProject()
+  {
+    final Project project = mockProject(true);
+    final DruidRel<?> rel = mockDruidRel(
+        DruidQueryRel.class,
+        PartialDruidQuery.Stage.SELECT_PROJECT,
+        project,
+        mockFilter()
+    );
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+  }
+
+  @Test
+  public void test_isScanOrMapping_filterThenProjectJoin()
+  {
+    final Project project = mockProject(true);
+    final DruidRel<?> rel = mockDruidRel(
+        DruidJoinQueryRel.class,
+        PartialDruidQuery.Stage.SELECT_PROJECT,
+        project,
+        mockFilter()
+    );
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+  }
+
+  @Test
+  public void test_isScanOrMapping_filter()
+  {
+    final DruidRel<?> rel = mockDruidRel(
+        DruidQueryRel.class,
+        PartialDruidQuery.Stage.WHERE_FILTER,
+        null,
+        mockFilter()
+    );
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery());
+  }
+
+  @Test
+  public void test_isScanOrMapping_filterJoin()
+  {
+    final DruidRel<?> rel = mockDruidRel(
+        DruidJoinQueryRel.class,
+        PartialDruidQuery.Stage.WHERE_FILTER,
+        null,
+        mockFilter()
+    );
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
+    Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
+
+    EasyMock.verify(rel, rel.getPartialDruidQuery());
+  }
+
+  @Test
+  public void test_isScanOrMapping_allStages()
+  {
+    final ImmutableSet<PartialDruidQuery.Stage> okStages = ImmutableSet.of(
+        PartialDruidQuery.Stage.SCAN,
+        PartialDruidQuery.Stage.SELECT_PROJECT
+    );
+
+    for (PartialDruidQuery.Stage stage : PartialDruidQuery.Stage.values()) {
+      final Project project = mockProject(true);
+      final DruidRel<?> rel = mockDruidRel(
+          DruidQueryRel.class,
+          stage,
+          project,
+          null
+      );
+
+      Assert.assertEquals(stage.toString(), okStages.contains(stage), DruidRels.isScanOrMapping(rel, true));
+      Assert.assertEquals(stage.toString(), okStages.contains(stage), DruidRels.isScanOrMapping(rel, false));
+
+      EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
+    }
+  }
+
+  private static DruidRel<?> mockDruidRel(
+      final Class<? extends DruidRel<?>> clazz,
+      final PartialDruidQuery.Stage stage,
+      @Nullable Project selectProject,
+      @Nullable Filter whereFilter
+  )
+  {
+    // DruidQueryRels rely on a ton of Calcite stuff like RelOptCluster, RelOptTable, etc, which is quite verbose to
+    // create real instances of. So, tragically, we'll use EasyMock.
+    final DruidRel<?> mockRel = EasyMock.mock(clazz);
+    final PartialDruidQuery mockPartialQuery = EasyMock.mock(PartialDruidQuery.class);
+    EasyMock.expect(mockPartialQuery.stage()).andReturn(stage).anyTimes();
+    EasyMock.expect(mockPartialQuery.getSelectProject()).andReturn(selectProject).anyTimes();
+    EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes();
+    EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes();
+    EasyMock.replay(mockRel, mockPartialQuery);
+    return mockRel;
+  }
+
+  private static Project mockProject(final boolean mapping)
+  {
+    final Project mockProject = EasyMock.mock(Project.class);
+    EasyMock.expect(mockProject.isMapping()).andReturn(mapping).anyTimes();
+    EasyMock.replay(mockProject);
+    return mockProject;
+  }
+
+  private static Filter mockFilter()
+  {
+    return EasyMock.mock(Filter.class);
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
new file mode 100644
index 0000000..dd706ff
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rule;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+public class DruidJoinRuleTest
+{
+  private final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl());
+
+  private final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE);
+
+  private final RelDataType leftType =
+      new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE).createStructType(
+          ImmutableList.of(typeFactory.createSqlType(SqlTypeName.VARCHAR)),
+          ImmutableList.of("left")
+      );
+
+  private final RelDataType joinType =
+      new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE).createStructType(
+          ImmutableList.of(
+              typeFactory.createSqlType(SqlTypeName.VARCHAR),
+              typeFactory.createSqlType(SqlTypeName.VARCHAR)
+          ),
+          ImmutableList.of("left", "right")
+      );
+
+  @Test
+  public void test_canHandleCondition_leftEqRight()
+  {
+    Assert.assertTrue(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.EQUALS,
+                rexBuilder.makeInputRef(joinType, 0),
+                rexBuilder.makeInputRef(joinType, 1)
+            ),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_canHandleCondition_leftFnEqRight()
+  {
+    Assert.assertTrue(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.EQUALS,
+                rexBuilder.makeCall(
+                    SqlStdOperatorTable.CONCAT,
+                    rexBuilder.makeLiteral("foo"),
+                    rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
+                ),
+                rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+            ),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_canHandleCondition_leftEqRightFn()
+  {
+    Assert.assertFalse(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.EQUALS,
+                rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
+                rexBuilder.makeCall(
+                    SqlStdOperatorTable.CONCAT,
+                    rexBuilder.makeLiteral("foo"),
+                    rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+                )
+            ),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_canHandleCondition_leftEqLeft()
+  {
+    Assert.assertFalse(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.EQUALS,
+                rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
+                rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
+            ),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_canHandleCondition_rightEqRight()
+  {
+    Assert.assertFalse(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.EQUALS,
+                rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1),
+                rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
+            ),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_canHandleCondition_true()
+  {
+    Assert.assertTrue(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeLiteral(true),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_canHandleCondition_false()
+  {
+    Assert.assertTrue(
+        DruidJoinRule.canHandleCondition(
+            rexBuilder.makeLiteral(false),
+            leftType
+        )
+    );
+  }
+
+  @Test
+  public void test_decomposeAnd_notAnAnd()
+  {
+    final List<RexNode> rexNodes = DruidJoinRule.decomposeAnd(rexBuilder.makeInputRef(leftType, 0));
+
+    Assert.assertEquals(1, rexNodes.size());
+    Assert.assertEquals(rexBuilder.makeInputRef(leftType, 0), Iterables.getOnlyElement(rexNodes));
+  }
+
+  @Test
+  public void test_decomposeAnd_basic()
+  {
+    final List<RexNode> decomposed = DruidJoinRule.decomposeAnd(
+        rexBuilder.makeCall(
+            SqlStdOperatorTable.AND,
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.AND,
+                rexBuilder.makeExactLiteral(BigDecimal.valueOf(1)),
+                rexBuilder.makeExactLiteral(BigDecimal.valueOf(2))
+            ),
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.AND,
+                rexBuilder.makeExactLiteral(BigDecimal.valueOf(3)),
+                rexBuilder.makeExactLiteral(BigDecimal.valueOf(4))
+            )
+        )
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            rexBuilder.makeExactLiteral(BigDecimal.valueOf(1)),
+            rexBuilder.makeExactLiteral(BigDecimal.valueOf(2)),
+            rexBuilder.makeExactLiteral(BigDecimal.valueOf(3)),
+            rexBuilder.makeExactLiteral(BigDecimal.valueOf(4))
+        ),
+        decomposed
+    );
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5c225b4..e60f831 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -118,6 +118,7 @@ import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.LookupSchema;
 import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
 import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
@@ -720,7 +721,10 @@ public class CalciteTests
         .buildMMappedIndex();
 
 
-    return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+    return new SpecificSegmentsQuerySegmentWalker(
+        conglomerate,
+        INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class)
+    ).add(
         DataSegment.builder()
                    .dataSource(DATASOURCE1)
                    .interval(index1.getDataInterval())
@@ -857,6 +861,10 @@ public class CalciteTests
     ).get(0);
   }
 
+  public static LookupSchema createMockLookupSchema()
+  {
+    return new LookupSchema(INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class));
+  }
 
   public static SystemSchema createMockSystemSchema(
       final DruidSchema druidSchema,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 155dc41..ccb4a3d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -19,58 +19,168 @@
 
 package org.apache.druid.sql.calcite.util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.common.io.Closeables;
+import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.FunctionalIterable;
-import org.apache.druid.query.Druids;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.NoopQueryRunner;
+import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.RetryQueryRunnerConfig;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
 import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.join.InlineJoinableFactory;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.join.Joinables;
+import org.apache.druid.segment.join.LookupJoinableFactory;
+import org.apache.druid.segment.join.MapJoinableFactory;
+import org.apache.druid.server.ClientQuerySegmentWalker;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.PartitionHolder;
 import org.joda.time.Interval;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
+/**
+ * A self-contained class that executes queries similarly to the normal Druid query stack.
+ *
+ * {@link ClientQuerySegmentWalker}, the same class that Brokers use as the entry point for their query stack, is
+ * used directly. Our own class {@link DataServerLikeWalker} mimics the behavior of
+ * {@link org.apache.druid.server.coordination.ServerManager}, the entry point for Historicals. That class isn't used
+ * directly because the sheer volume of dependencies makes it quite verbose to use in a test environment.
+ */
 public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, Closeable
 {
   private final QueryRunnerFactoryConglomerate conglomerate;
+  private final QuerySegmentWalker walker;
+  private final JoinableFactory joinableFactory;
   private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
   private final List<Closeable> closeables = new ArrayList<>();
   private final List<DataSegment> segments = new ArrayList<>();
 
-  public SpecificSegmentsQuerySegmentWalker(QueryRunnerFactoryConglomerate conglomerate)
+  /**
+   * Create an instance using the provided query runner factory conglomerate and lookup provider.
+   */
+  public SpecificSegmentsQuerySegmentWalker(
+      final QueryRunnerFactoryConglomerate conglomerate,
+      final LookupExtractorFactoryContainerProvider lookupProvider
+  )
   {
     this.conglomerate = conglomerate;
+    this.joinableFactory = new MapJoinableFactory(
+        ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
+            .put(InlineDataSource.class, new InlineJoinableFactory())
+            .put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
+            .build()
+    );
+    this.walker = new ClientQuerySegmentWalker(
+        new NoopServiceEmitter(),
+        new DataServerLikeWalker(),
+        new QueryToolChestWarehouse()
+        {
+          @Override
+          public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
+          {
+            return conglomerate.findFactory(query).getToolchest();
+          }
+        },
+        new RetryQueryRunnerConfig(),
+        TestHelper.makeJsonMapper(),
+        new ServerConfig(),
+        null /* Cache */,
+        new CacheConfig()
+        {
+          @Override
+          public boolean isPopulateCache()
+          {
+            return false;
+          }
+
+          @Override
+          public boolean isUseCache()
+          {
+            return false;
+          }
+
+          @Override
+          public boolean isPopulateResultLevelCache()
+          {
+            return false;
+          }
+
+          @Override
+          public boolean isUseResultLevelCache()
+          {
+            return false;
+          }
+        }
+    );
+  }
+
+  /**
+   * Create an instance without any lookups.
+   */
+  public SpecificSegmentsQuerySegmentWalker(final QueryRunnerFactoryConglomerate conglomerate)
+  {
+    this(
+        conglomerate,
+        new LookupExtractorFactoryContainerProvider()
+        {
+          @Override
+          public Set<String> getAllLookupNames()
+          {
+            return Collections.emptySet();
+          }
+
+          @Override
+          public Optional<LookupExtractorFactoryContainer> get(String lookupName)
+          {
+            return Optional.empty();
+          }
+        }
+    );
   }
 
   public SpecificSegmentsQuerySegmentWalker add(
@@ -79,8 +189,10 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
   )
   {
     final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
-    final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines
-        .computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural()));
+    final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.computeIfAbsent(
+        descriptor.getDataSource(),
+        datasource -> new VersionedIntervalTimeline<>(Ordering.natural())
+    );
     timeline.add(
         descriptor.getInterval(),
         descriptor.getVersion(),
@@ -102,70 +214,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
       final Iterable<Interval> intervals
   )
   {
-    Query<T> newQuery = query;
-    if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) {
-      newQuery = (Query<T>) Druids.ScanQueryBuilder.copy((ScanQuery) query)
-                                                   .intervals(new MultipleSpecificSegmentSpec(ImmutableList.of()))
-                                                   .build();
-    }
-
-    final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(newQuery);
-    if (factory == null) {
-      throw new ISE("Unknown query type[%s].", newQuery.getClass());
-    }
-
-    final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
-
-    return new FinalizeResultsQueryRunner<>(
-        toolChest.postMergeQueryDecoration(
-            toolChest.mergeResults(
-                toolChest.preMergeQueryDecoration(
-                    (queryPlus, responseContext) -> {
-                      Query<T> query1 = queryPlus.getQuery();
-                      Query<T> newQuery1 = query1;
-                      if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) {
-                        newQuery1 = (Query<T>) Druids.ScanQueryBuilder.copy((ScanQuery) query)
-                                                                      .intervals(new MultipleSpecificSegmentSpec(
-                                                                          ImmutableList.of(new SegmentDescriptor(
-                                                                              Intervals.of("2015-04-12/2015-04-13"),
-                                                                              "4",
-                                                                              0
-                                                                          ))))
-                                                                      .context(ImmutableMap.of(
-                                                                          ScanQuery.CTX_KEY_OUTERMOST,
-                                                                          false
-                                                                      ))
-                                                                      .build();
-                      }
-                      final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = getTimelineForTableDataSource(
-                          newQuery1);
-                      return makeBaseRunner(
-                          newQuery1,
-                          toolChest,
-                          factory,
-                          FunctionalIterable
-                              .create(intervals)
-                              .transformCat(
-                                  interval -> timeline.lookup(interval)
-                              )
-                              .transformCat(
-                                  holder -> FunctionalIterable
-                                      .create(holder.getObject())
-                                      .transform(
-                                          chunk -> new SegmentDescriptor(
-                                              holder.getInterval(),
-                                              holder.getVersion(),
-                                              chunk.getChunkNumber()
-                                          )
-                                      )
-                              )
-                      ).run(QueryPlus.wrap(newQuery1), responseContext);
-                    }
-                )
-            )
-        ),
-        toolChest
-    );
+    return walker.getQueryRunnerForIntervals(query, intervals);
   }
 
   @Override
@@ -174,23 +223,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
       final Iterable<SegmentDescriptor> specs
   )
   {
-    final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
-    if (factory == null) {
-      throw new ISE("Unknown query type[%s].", query.getClass());
-    }
-
-    final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
-
-    return new FinalizeResultsQueryRunner<>(
-        toolChest.postMergeQueryDecoration(
-            toolChest.mergeResults(
-                toolChest.preMergeQueryDecoration(
-                    makeBaseRunner(query, toolChest, factory, specs)
-                )
-            )
-        ),
-        toolChest
-    );
+    return walker.getQueryRunnerForSegments(query, specs);
   }
 
   @Override
@@ -201,52 +234,182 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
     }
   }
 
-  private <T> VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimelineForTableDataSource(Query<T> query)
+  private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Interval interval)
   {
-    if (query.getDataSource() instanceof TableDataSource) {
-      return timelines.get(((TableDataSource) query.getDataSource()).getName());
+    final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
+
+    if (timeline == null) {
+      return Collections.emptyList();
     } else {
-      throw new UOE("DataSource type[%s] unsupported", query.getDataSource().getClass().getName());
+      final List<WindowedSegment> retVal = new ArrayList<>();
+
+      for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : timeline.lookup(interval)) {
+        for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
+          retVal.add(new WindowedSegment(chunk.getObject(), holder.getInterval()));
+        }
+      }
+
+      return retVal;
     }
   }
 
-  private <T> QueryRunner<T> makeBaseRunner(
-      final Query<T> query,
-      final QueryToolChest<T, Query<T>> toolChest,
-      final QueryRunnerFactory<T, Query<T>> factory,
-      final Iterable<SegmentDescriptor> specs
-  )
+  private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Iterable<SegmentDescriptor> specs)
   {
-    final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = getTimelineForTableDataSource(query);
+    final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
+
     if (timeline == null) {
-      return new NoopQueryRunner<>();
+      return Collections.emptyList();
+    } else {
+      final List<WindowedSegment> retVal = new ArrayList<>();
+
+      for (SegmentDescriptor spec : specs) {
+        final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
+            spec.getInterval(),
+            spec.getVersion()
+        );
+        retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval()));
+      }
+
+      return retVal;
+    }
+  }
+
+  public static class WindowedSegment
+  {
+    private final Segment segment;
+    private final Interval interval;
+
+    public WindowedSegment(Segment segment, Interval interval)
+    {
+      this.segment = segment;
+      this.interval = interval;
+      Preconditions.checkArgument(segment.getId().getInterval().contains(interval));
+    }
+
+    public Segment getSegment()
+    {
+      return segment;
+    }
+
+    public Interval getInterval()
+    {
+      return interval;
+    }
+
+    public SegmentDescriptor getDescriptor()
+    {
+      return new SegmentDescriptor(interval, segment.getId().getVersion(), segment.getId().getPartitionNum());
+    }
+  }
+
+  /**
+   * Mimics the behavior of a data server (e.g. Historical).
+   *
+   * Compare to {@link org.apache.druid.server.SegmentManager}.
+   */
+  private class DataServerLikeWalker implements QuerySegmentWalker
+  {
+    @Override
+    public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
+    {
+      final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+
+      if (!analysis.isConcreteTableBased()) {
+        throw new ISE("Cannot handle datasource: %s", query.getDataSource());
+      }
+
+      final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
+
+      FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable
+          .create(intervals)
+          .transformCat(interval -> getSegmentsForTable(dataSourceName, interval))
+          .transform(WindowedSegment::getDescriptor);
+
+      return getQueryRunnerForSegments(query, segmentDescriptors);
     }
 
-    return new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            factory.mergeRunners(
-                Execs.directExecutor(),
-                FunctionalIterable
-                    .create(specs)
-                    .transformCat(
-                        descriptor -> {
-                          final PartitionHolder<ReferenceCountingSegment> holder = timeline.findEntry(
-                              descriptor.getInterval(),
-                              descriptor.getVersion()
-                          );
-
-                          return Iterables.transform(
-                              holder,
-                              chunk -> new SpecificSegmentQueryRunner<T>(
-                                  factory.createRunner(chunk.getObject()),
-                                  new SpecificSegmentSpec(descriptor)
+    @Override
+    public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
+    {
+      final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
+      if (factory == null) {
+        throw new ISE("Unknown query type[%s].", query.getClass());
+      }
+
+      final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
+
+      if (!analysis.isConcreteTableBased()) {
+        throw new ISE("Cannot handle datasource: %s", query.getDataSource());
+      }
+
+      final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
+
+      final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
+
+      // Make sure this query type can handle the subquery, if present.
+      if (analysis.isQuery()
+          && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
+        throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
+      }
+
+      final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
+          analysis.getPreJoinableClauses(),
+          joinableFactory,
+          new AtomicLong()
+      );
+
+      final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
+          toolChest.postMergeQueryDecoration(
+              toolChest.mergeResults(
+                  toolChest.preMergeQueryDecoration(
+                      makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
+                  )
+              )
+          ),
+          toolChest
+      );
+
+      // Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
+      // This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
+      // to function properly.
+      return (theQuery, responseContext) -> baseRunner.run(
+          theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
+          responseContext
+      );
+    }
+
+    private <T> QueryRunner<T> makeTableRunner(
+        final QueryToolChest<T, Query<T>> toolChest,
+        final QueryRunnerFactory<T, Query<T>> factory,
+        final Iterable<WindowedSegment> segments,
+        final Function<Segment, Segment> segmentMapFn
+    )
+    {
+      final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
+
+      if (segmentsList.isEmpty()) {
+        // Note: this is not correct when there's a right or full outer join going on.
+        // See https://github.com/apache/druid/issues/9229 for details.
+        return new NoopQueryRunner<>();
+      }
+
+      return new FinalizeResultsQueryRunner<>(
+          toolChest.mergeResults(
+              factory.mergeRunners(
+                  Execs.directExecutor(),
+                  FunctionalIterable
+                      .create(segmentsList)
+                      .transform(
+                          segment ->
+                              new SpecificSegmentQueryRunner<>(
+                                  factory.createRunner(segmentMapFn.apply(segment.getSegment())),
+                                  new SpecificSegmentSpec(segment.getDescriptor())
                               )
-                          );
-                        }
-                    )
-            )
-        ),
-        toolChest
-    );
+                      )
+              )
+          ),
+          toolChest
+      );
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org