You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/19 06:28:56 UTC

[druid] branch master updated: Convert the Druid planner to use statement handlers (#12905)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ce03eb094 Convert the Druid planner to use statement handlers (#12905)
8ce03eb094 is described below

commit 8ce03eb094d6723b831ac03a067e2c4e3cf499fb
Author: Paul Rogers <pa...@users.noreply.github.com>
AuthorDate: Mon Sep 19 08:28:45 2022 +0200

    Convert the Druid planner to use statement handlers (#12905)
    
    * Converted Druid planner to use statement handlers
    
    Converts the large collection of if-statements for statement
    types into a set of classes: one per supported statement type.
    Cleans up a few error messages.
    
    * Revisions from review comments
    
    * Build fix
    
    * Build fix
    
    * Resolve merge confict.
    
    * More merges with QueryResponse PR
    
    * More parameterized type cleanup
    
    Forces a rebuild due to a flaky test
---
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |   2 +-
 .../org/apache/druid/msq/sql/SqlTaskResource.java  |   4 +-
 .../org/apache/druid/msq/exec/MSQInsertTest.java   |   2 +-
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  |   2 +-
 .../org/apache/druid/msq/exec/MSQSelectTest.java   |   8 +-
 .../org/apache/druid/server/QueryLifecycle.java    |  18 +-
 .../org/apache/druid/server/QueryResource.java     |   6 +-
 .../org/apache/druid/server/QueryResponse.java     |  18 +-
 sql/src/main/codegen/includes/insert.ftl           |   2 +-
 sql/src/main/codegen/includes/replace.ftl          |   2 +-
 .../java/org/apache/druid/sql/DirectStatement.java |   4 +-
 .../druid/sql/calcite/parser/DruidSqlIngest.java   |  72 ++
 .../druid/sql/calcite/parser/DruidSqlInsert.java   |  40 +-
 .../druid/sql/calcite/parser/DruidSqlReplace.java  |  40 +-
 .../druid/sql/calcite/planner/DruidPlanner.java    | 904 ++-------------------
 .../druid/sql/calcite/planner/IngestHandler.java   | 346 ++++++++
 .../druid/sql/calcite/planner/PlannerResult.java   |   6 +-
 .../druid/sql/calcite/planner/QueryHandler.java    | 675 +++++++++++++++
 .../sql/calcite/planner/SqlStatementHandler.java   |  76 ++
 .../apache/druid/sql/calcite/rel/DruidQuery.java   |   6 +-
 .../org/apache/druid/sql/calcite/rel/DruidRel.java |   4 +-
 .../druid/sql/calcite/rel/DruidUnionRel.java       |  14 +-
 .../druid/sql/calcite/run/NativeQueryMaker.java    |  22 +-
 .../apache/druid/sql/calcite/run/QueryMaker.java   |   2 +-
 .../org/apache/druid/sql/http/SqlResource.java     |   2 +-
 .../druid/sql/calcite/CalciteInsertDmlTest.java    |  24 +-
 .../druid/sql/calcite/CalciteReplaceDmlTest.java   |  16 +-
 .../druid/sql/calcite/TestInsertQueryMaker.java    |   2 +-
 .../org/apache/druid/sql/http/SqlResourceTest.java |  10 +-
 29 files changed, 1334 insertions(+), 995 deletions(-)

diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index e2754cacbe..c1611f52db 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker
   }
 
   @Override
-  public QueryResponse runQuery(final DruidQuery druidQuery)
+  public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
   {
     String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
 
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
index 1725060806..f0cd7318f6 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
@@ -147,8 +147,8 @@ public class SqlTaskResource
     final String sqlQueryId = stmt.sqlQueryId();
     try {
       final DirectStatement.ResultSet plan = stmt.plan();
-      final QueryResponse response = plan.run();
-      final Sequence sequence = response.getResults();
+      final QueryResponse<Object[]> response = plan.run();
+      final Sequence<Object[]> sequence = response.getResults();
       final SqlRowTransformer rowTransformer = plan.createRowTransformer();
       final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());
 
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index a1934f070a..18ad9f050d 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -439,7 +439,7 @@ public class MSQInsertTest extends MSQTestBase
                      .setExpectedValidationErrorMatcher(CoreMatchers.allOf(
                          CoreMatchers.instanceOf(SqlPlanningException.class),
                          ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
-                             "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"))
+                             "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"))
                      ))
                      .verifyPlanningErrors();
   }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 131a9fa91b..ca7d5d3203 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -187,7 +187,7 @@ public class MSQReplaceTest extends MSQTestBase
                          CoreMatchers.allOf(
                              CoreMatchers.instanceOf(SqlPlanningException.class),
                              ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
-                                 "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."))
+                                 "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table."))
                          )
                      )
                      .verifyPlanningErrors();
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 10c67e27a2..12b353268a 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -697,7 +697,7 @@ public class MSQSelectTest extends MSQTestBase
             CoreMatchers.allOf(
                 CoreMatchers.instanceOf(SqlPlanningException.class),
                 ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
-                    "Cannot query table [INFORMATION_SCHEMA.SCHEMATA] with SQL engine 'msq-task'."))
+                    "Cannot query table INFORMATION_SCHEMA.SCHEMATA with SQL engine 'msq-task'."))
             )
         )
         .verifyPlanningErrors();
@@ -712,7 +712,7 @@ public class MSQSelectTest extends MSQTestBase
             CoreMatchers.allOf(
                 CoreMatchers.instanceOf(SqlPlanningException.class),
                 ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
-                    "Cannot query table [sys.segments] with SQL engine 'msq-task'."))
+                    "Cannot query table sys.segments with SQL engine 'msq-task'."))
             )
         )
         .verifyPlanningErrors();
@@ -727,7 +727,7 @@ public class MSQSelectTest extends MSQTestBase
             CoreMatchers.allOf(
                 CoreMatchers.instanceOf(SqlPlanningException.class),
                 ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
-                    "Cannot query table [sys.segments] with SQL engine 'msq-task'."))
+                    "Cannot query table sys.segments with SQL engine 'msq-task'."))
             )
         )
         .verifyPlanningErrors();
@@ -743,7 +743,7 @@ public class MSQSelectTest extends MSQTestBase
             CoreMatchers.allOf(
                 CoreMatchers.instanceOf(SqlPlanningException.class),
                 ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
-                    "Cannot query table [sys.segments] with SQL engine 'msq-task'."))
+                    "Cannot query table sys.segments with SQL engine 'msq-task'."))
             )
         )
         .verifyPlanningErrors();
diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
index b4d80d01d9..40e5267b80 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
@@ -128,7 +128,6 @@ public class QueryLifecycle
     this.startNs = startNs;
   }
 
-
   /**
    * For callers who have already authorized their query, and where simplicity is desired over flexibility. This method
    * does it all in one call. Logs and metrics are emitted when the Sequence is either fully iterated or throws an
@@ -140,8 +139,7 @@ public class QueryLifecycle
    *
    * @return results
    */
-  @SuppressWarnings("unchecked")
-  public <T> QueryResponse runSimple(
+  public <T> QueryResponse<T> runSimple(
       final Query<T> query,
       final AuthenticationResult authenticationResult,
       final Access authorizationResult
@@ -151,7 +149,7 @@ public class QueryLifecycle
 
     final Sequence<T> results;
 
-    final QueryResponse queryResponse;
+    final QueryResponse<T> queryResponse;
     try {
       preAuthorized(authenticationResult, authorizationResult);
       if (!authorizationResult.isAllowed()) {
@@ -172,7 +170,7 @@ public class QueryLifecycle
      * cannot be moved into execute().  We leave this as an exercise for the future, however as this oddity
      * was discovered while just trying to expose HTTP response headers
      */
-    return new QueryResponse(
+    return new QueryResponse<T>(
         Sequences.wrap(
             results,
             new SequenceWrapper()
@@ -193,8 +191,7 @@ public class QueryLifecycle
    *
    * @param baseQuery the query
    */
-  @SuppressWarnings("unchecked")
-  public void initialize(final Query baseQuery)
+  public void initialize(final Query<?> baseQuery)
   {
     transition(State.NEW, State.INITIALIZED);
 
@@ -282,17 +279,18 @@ public class QueryLifecycle
    *
    * @return result sequence and response context
    */
-  public QueryResponse execute()
+  public <T> QueryResponse<T> execute()
   {
     transition(State.AUTHORIZED, State.EXECUTING);
 
     final ResponseContext responseContext = DirectDruidClient.makeResponseContextForQuery();
 
-    final Sequence<?> res = QueryPlus.wrap(baseQuery)
+    @SuppressWarnings("unchecked")
+    final Sequence<T> res = QueryPlus.wrap((Query<T>) baseQuery)
                                   .withIdentity(authenticationResult.getIdentity())
                                   .run(texasRanger, responseContext);
 
-    return new QueryResponse(res == null ? Sequences.empty() : res, responseContext);
+    return new QueryResponse<T>(res == null ? Sequences.empty() : res, responseContext);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index ea225efbae..1a72cfc3b8 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -200,7 +200,7 @@ public class QueryResource implements QueryCountStatsProvider
         throw new ForbiddenException(authResult.toString());
       }
 
-      final QueryResponse queryResponse = queryLifecycle.execute();
+      final QueryResponse<?> queryResponse = queryLifecycle.execute();
       final Sequence<?> results = queryResponse.getResults();
       final ResponseContext responseContext = queryResponse.getResponseContext();
       final String prevEtag = getPreviousEtag(req);
@@ -477,8 +477,8 @@ public class QueryResource implements QueryCountStatsProvider
     }
 
     ObjectWriter newOutputWriter(
-        @Nullable QueryToolChest toolChest,
-        @Nullable Query query,
+        @Nullable QueryToolChest<?, Query<?>> toolChest,
+        @Nullable Query<?> query,
         boolean serializeDateTimeAsLong
     )
     {
diff --git a/server/src/main/java/org/apache/druid/server/QueryResponse.java b/server/src/main/java/org/apache/druid/server/QueryResponse.java
index 69908aee94..d590c5014e 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResponse.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResponse.java
@@ -22,23 +22,23 @@ package org.apache.druid.server;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.context.ResponseContext;
 
-public class QueryResponse
+public class QueryResponse<T>
 {
-  public static QueryResponse withEmptyContext(Sequence results)
-  {
-    return new QueryResponse(results, ResponseContext.createEmpty());
-  }
-
-  private final Sequence results;
+  private final Sequence<T> results;
   private final ResponseContext responseContext;
 
-  public QueryResponse(final Sequence results, final ResponseContext responseContext)
+  public QueryResponse(final Sequence<T> results, final ResponseContext responseContext)
   {
     this.results = results;
     this.responseContext = responseContext;
   }
 
-  public Sequence getResults()
+  public static <T> QueryResponse<T> withEmptyContext(Sequence<T> results)
+  {
+    return new QueryResponse<T>(results, ResponseContext.createEmpty());
+  }
+
+  public Sequence<T> getResults()
   {
     return results;
   }
diff --git a/sql/src/main/codegen/includes/insert.ftl b/sql/src/main/codegen/includes/insert.ftl
index 07898aaadc..c0e04bc772 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -38,7 +38,7 @@ SqlNode DruidSqlInsertEof() :
   ]
   {
       if (clusteredBy != null && partitionedBy.lhs == null) {
-        throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
+        throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
       }
   }
   // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
diff --git a/sql/src/main/codegen/includes/replace.ftl b/sql/src/main/codegen/includes/replace.ftl
index ed9eee46de..ed8dbb10ee 100644
--- a/sql/src/main/codegen/includes/replace.ftl
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -58,7 +58,7 @@ SqlNode DruidSqlReplaceEof() :
     ]
     {
         if (clusteredBy != null && partitionedBy.lhs == null) {
-          throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
+          throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
         }
     }
     // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
index e173213a0c..507216c236 100644
--- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
@@ -98,7 +98,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
      * Do the actual execute step which allows subclasses to wrap the sequence,
      * as is sometimes needed for testing.
      */
-    public QueryResponse run()
+    public QueryResponse<Object[]> run()
     {
       try {
         // Check cancellation. Required for SqlResourceTest to work.
@@ -176,7 +176,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
    *
    * @return sequence which delivers query results
    */
-  public QueryResponse execute()
+  public QueryResponse<Object[]> execute()
   {
     return plan().run();
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
new file mode 100644
index 0000000000..26f019e0a1
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.java.util.common.granularity.Granularity;
+
+import javax.annotation.Nullable;
+
+/**
+ * Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
+ * Allows Planner code to work with these two statements generically where they
+ * share common clauses.
+ */
+public abstract class DruidSqlIngest extends SqlInsert
+{
+  protected final Granularity partitionedBy;
+
+  // Used in the unparse function to generate the original query since we convert the string to an enum
+  protected final String partitionedByStringForUnparse;
+
+  @Nullable
+  protected final SqlNodeList clusteredBy;
+
+  public DruidSqlIngest(SqlParserPos pos,
+      SqlNodeList keywords,
+      SqlNode targetTable,
+      SqlNode source,
+      SqlNodeList columnList,
+      @Nullable Granularity partitionedBy,
+      @Nullable String partitionedByStringForUnparse,
+      @Nullable SqlNodeList clusteredBy
+  )
+  {
+    super(pos, keywords, targetTable, source, columnList);
+
+    this.partitionedByStringForUnparse = partitionedByStringForUnparse;
+    this.partitionedBy = partitionedBy;
+    this.clusteredBy = clusteredBy;
+  }
+
+  public Granularity getPartitionedBy()
+  {
+    return partitionedBy;
+  }
+
+  @Nullable
+  public SqlNodeList getClusteredBy()
+  {
+    return clusteredBy;
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
index dbaace1d93..c2eeb2ed1e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.sql.calcite.parser;
 
-import com.google.common.base.Preconditions;
 import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
@@ -32,24 +31,16 @@ import javax.annotation.Nullable;
 
 /**
  * Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY
- * This class extends the {@link SqlInsert} so that this SqlNode can be used in
+ * This class extends the {@link DruidSqlIngest} so that this SqlNode can be used in
  * {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
  */
-public class DruidSqlInsert extends SqlInsert
+public class DruidSqlInsert extends DruidSqlIngest
 {
   public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";
 
   // This allows reusing super.unparse
   public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
 
-  private final Granularity partitionedBy;
-
-  // Used in the unparse function to generate the original query since we convert the string to an enum
-  private final String partitionedByStringForUnparse;
-
-  @Nullable
-  private final SqlNodeList clusteredBy;
-
   /**
    * While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
    * disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@@ -61,35 +52,18 @@ public class DruidSqlInsert extends SqlInsert
       @Nullable Granularity partitionedBy,
       @Nullable String partitionedByStringForUnparse,
       @Nullable SqlNodeList clusteredBy
-  ) throws ParseException
+  )
   {
     super(
         insertNode.getParserPosition(),
         (SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
         insertNode.getTargetTable(),
         insertNode.getSource(),
-        insertNode.getTargetColumnList()
+        insertNode.getTargetColumnList(),
+        partitionedBy,
+        partitionedByStringForUnparse,
+        clusteredBy
     );
-    if (partitionedBy == null) {
-      throw new ParseException("INSERT statements must specify PARTITIONED BY clause explicitly");
-    }
-    this.partitionedBy = partitionedBy;
-
-    Preconditions.checkNotNull(partitionedByStringForUnparse);
-    this.partitionedByStringForUnparse = partitionedByStringForUnparse;
-
-    this.clusteredBy = clusteredBy;
-  }
-
-  @Nullable
-  public SqlNodeList getClusteredBy()
-  {
-    return clusteredBy;
-  }
-
-  public Granularity getPartitionedBy()
-  {
-    return partitionedBy;
   }
 
   @Nonnull
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index fb41bf7656..d527a08b59 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.sql.calcite.parser;
 
-import com.google.common.base.Preconditions;
 import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
@@ -38,22 +37,14 @@ import javax.annotation.Nullable;
  * This class extends the {@link SqlInsert} so that this SqlNode can be used in
  * {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
  */
-public class DruidSqlReplace extends SqlInsert
+public class DruidSqlReplace extends DruidSqlIngest
 {
   public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
 
   public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER);
 
-  private final Granularity partitionedBy;
-
-  // Used in the unparse function to generate the original query since we convert the string to an enum
-  private final String partitionedByStringForUnparse;
-
   private final SqlNode replaceTimeQuery;
 
-  @Nullable
-  private final SqlNodeList clusteredBy;
-
   /**
    * While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
    * disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@@ -66,28 +57,20 @@ public class DruidSqlReplace extends SqlInsert
       @Nullable String partitionedByStringForUnparse,
       @Nullable SqlNodeList clusteredBy,
       @Nullable SqlNode replaceTimeQuery
-  ) throws ParseException
+  )
   {
     super(
         insertNode.getParserPosition(),
         (SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
         insertNode.getTargetTable(),
         insertNode.getSource(),
-        insertNode.getTargetColumnList()
+        insertNode.getTargetColumnList(),
+        partitionedBy,
+        partitionedByStringForUnparse,
+        clusteredBy
     );
-    if (replaceTimeQuery == null) {
-      throw new ParseException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.");
-    }
-    if (partitionedBy == null) {
-      throw new ParseException("REPLACE statements must specify PARTITIONED BY clause explicitly");
-    }
-    this.partitionedBy = partitionedBy;
-
-    this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse);
 
     this.replaceTimeQuery = replaceTimeQuery;
-
-    this.clusteredBy = clusteredBy;
   }
 
   public SqlNode getReplaceTimeQuery()
@@ -95,17 +78,6 @@ public class DruidSqlReplace extends SqlInsert
     return replaceTimeQuery;
   }
 
-  public Granularity getPartitionedBy()
-  {
-    return partitionedBy;
-  }
-
-  @Nullable
-  public SqlNodeList getClusteredBy()
-  {
-    return clusteredBy;
-  }
-
   @Nonnull
   @Override
   public SqlOperator getOperator()
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index 75be75855c..68db56dd37 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -19,90 +19,31 @@
 
 package org.apache.druid.sql.calcite.planner;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.BindableConvention;
-import org.apache.calcite.interpreter.BindableRel;
-import org.apache.calcite.interpreter.Bindables;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
-import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.query.Query;
-import org.apache.druid.segment.DimensionHandlerUtils;
-import org.apache.druid.server.QueryResponse;
+import org.apache.druid.query.QueryContext;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
 import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
-import org.apache.druid.sql.calcite.rel.DruidConvention;
-import org.apache.druid.sql.calcite.rel.DruidQuery;
-import org.apache.druid.sql.calcite.rel.DruidRel;
-import org.apache.druid.sql.calcite.rel.DruidUnionRel;
-import org.apache.druid.sql.calcite.run.EngineFeature;
-import org.apache.druid.sql.calcite.run.QueryMaker;
 import org.apache.druid.sql.calcite.run.SqlEngine;
-import org.apache.druid.sql.calcite.table.DruidTable;
-import org.apache.druid.utils.Throwables;
 import org.joda.time.DateTimeZone;
 
-import javax.annotation.Nullable;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 /**
  * Druid SQL planner. Wraps the underlying Calcite planner with Druid-specific
@@ -121,27 +62,13 @@ public class DruidPlanner implements Closeable
     START, VALIDATED, PREPARED, PLANNED
   }
 
-  private static final EmittingLogger log = new EmittingLogger(DruidPlanner.class);
-  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
-  @VisibleForTesting
-  public static final String UNNAMED_INGESTION_COLUMN_ERROR =
-      "Cannot ingest expressions that do not have an alias "
-          + "or columns with names like EXPR$[digit].\n"
-          + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as "
-          + "\"func(X) as myColumn\"";
-
   private final FrameworkConfig frameworkConfig;
   private final CalcitePlanner planner;
   private final PlannerContext plannerContext;
   private final SqlEngine engine;
   private State state = State.START;
-  private ParsedNodes parsed;
-  private SqlNode validatedQueryNode;
+  private SqlStatementHandler handler;
   private boolean authorized;
-  private PrepareResult prepareResult;
-  private Set<ResourceAction> resourceActions;
-  private RelRoot rootQueryRel;
-  private RexBuilder rexBuilder;
 
   DruidPlanner(
       final FrameworkConfig frameworkConfig,
@@ -170,80 +97,44 @@ public class DruidPlanner implements Closeable
 
     // Parse the query string.
     SqlNode root = planner.parse(plannerContext.getSql());
-    parsed = ParsedNodes.create(root, plannerContext.getTimeZone());
-
-    if (parsed.isSelect() && !plannerContext.engineHasFeature(EngineFeature.CAN_SELECT)) {
-      throw new ValidationException(StringUtils.format("Cannot execute SELECT with SQL engine '%s'.", engine.name()));
-    } else if (parsed.isInsert() && !plannerContext.engineHasFeature(EngineFeature.CAN_INSERT)) {
-      throw new ValidationException(StringUtils.format("Cannot execute INSERT with SQL engine '%s'.", engine.name()));
-    } else if (parsed.isReplace() && !plannerContext.engineHasFeature(EngineFeature.CAN_REPLACE)) {
-      throw new ValidationException(StringUtils.format("Cannot execute REPLACE with SQL engine '%s'.", engine.name()));
-    }
+    handler = createHandler(root);
 
     try {
-      if (parsed.getIngestionGranularity() != null) {
-        plannerContext.getQueryContext().addSystemParam(
-            DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
-            plannerContext.getJsonMapper().writeValueAsString(parsed.getIngestionGranularity())
-        );
-      }
-    }
-    catch (JsonProcessingException e) {
-      throw new ValidationException("Unable to serialize partition granularity.");
-    }
-
-    if (parsed.getReplaceIntervals() != null) {
-      plannerContext.getQueryContext().addSystemParam(
-          DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
-          String.join(",", parsed.getReplaceIntervals())
-      );
-    }
-
-    try {
-      // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
-      // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
-      // replacement.
-      //
-      // Parameter replacement is done only if the client provides parameter values.
-      // If this is a PREPARE-only, then there will be no values even if the statement contains
-      // parameters. If this is a PLAN, then we'll catch later the case that the statement
-      // contains parameters, but no values were provided.
-      SqlNode queryNode = parsed.getQueryNode();
-      if (!plannerContext.getParameters().isEmpty()) {
-        queryNode = queryNode.accept(new SqlParameterizerShuttle(plannerContext));
-      }
-      validatedQueryNode = planner.validate(queryNode);
+      handler.validate();
+      plannerContext.setResourceActions(handler.resourceActions());
     }
     catch (RuntimeException e) {
       throw new ValidationException(e);
     }
 
-    final SqlValidator validator = planner.getValidator();
-    SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(validator, plannerContext);
-    validatedQueryNode.accept(resourceCollectorShuttle);
-
-    resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions());
+    state = State.VALIDATED;
+  }
 
-    if (parsed.isInsert() || parsed.isReplace()) {
-      // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
-      // the number of rows inserted to be limited which is likely to be confusing and unintended.
-      if (plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
-        throw new ValidationException(
-            StringUtils.format(
-                "%s cannot be provided with %s.",
-                PlannerContext.CTX_SQL_OUTER_LIMIT,
-                parsed.getInsertOrReplace().getOperator().getName()
-            )
-        );
+  private SqlStatementHandler createHandler(final SqlNode node) throws ValidationException
+  {
+    SqlNode query = node;
+    SqlExplain explain = null;
+    if (query.getKind() == SqlKind.EXPLAIN) {
+      explain = (SqlExplain) query;
+      query = explain.getExplicandum();
+    }
+
+    SqlStatementHandler.HandlerContext handlerContext = new HandlerContextImpl();
+    if (query.getKind() == SqlKind.INSERT) {
+      if (query instanceof DruidSqlInsert) {
+        return new IngestHandler.InsertHandler(handlerContext, (DruidSqlInsert) query, explain);
+      } else if (query instanceof DruidSqlReplace) {
+        return new IngestHandler.ReplaceHandler(handlerContext, (DruidSqlReplace) query, explain);
       }
-      final String targetDataSource = validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
-      resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE));
     }
 
-    state = State.VALIDATED;
-    plannerContext.setResourceActions(resourceActions);
+    if (query.isA(SqlKind.QUERY)) {
+      return new QueryHandler.SelectHandler(handlerContext, query, explain);
+    }
+    throw new ValidationException(StringUtils.format("Cannot execute [%s].", node.getKind()));
   }
 
+
   /**
    * Prepare a SQL query for execution, including some initial parsing and
    * validation and any dynamic parameter type resolution, to support prepared
@@ -260,30 +151,9 @@ public class DruidPlanner implements Closeable
   public PrepareResult prepare()
   {
     Preconditions.checkState(state == State.VALIDATED);
-
-    rootQueryRel = planner.rel(validatedQueryNode);
-    doPrepare();
+    handler.prepare();
     state = State.PREPARED;
-    return prepareResult;
-  }
-
-  private void doPrepare()
-  {
-    final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
-    final SqlValidator validator = planner.getValidator();
-    final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode);
-    final RelDataType returnedRowType;
-
-    if (parsed.getExplainNode() != null) {
-      returnedRowType = getExplainStructType(typeFactory);
-    } else if (parsed.isSelect()) {
-      returnedRowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType);
-    } else {
-      assert parsed.insertOrReplace != null;
-      returnedRowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType);
-    }
-
-    prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes);
+    return prepareResult();
   }
 
   /**
@@ -309,14 +179,14 @@ public class DruidPlanner implements Closeable
 
   /**
    * Return the resource actions corresponding to the datasources and views which
-   * an authenticated request must be authorized for to process the
-   * query. The actions will be {@code null} if the
-   * planner has not yet advanced to the validation step. This may occur if
-   * validation fails and the caller accesses the resource
-   * actions as part of clean-up.
+   * an authenticated request must be authorized for to process the query. The
+   * actions will be {@code null} if the planner has not yet advanced to the
+   * validation step. This may occur if validation fails and the caller accesses
+   * the resource actions as part of clean-up.
    */
   public Set<ResourceAction> resourceActions(boolean includeContext)
   {
+    Set<ResourceAction> resourceActions = plannerContext.getResourceActions();
     if (includeContext) {
       Set<ResourceAction> actions = new HashSet<>(resourceActions);
       plannerContext.getQueryContext().getUserParams().keySet().forEach(contextParam -> actions.add(
@@ -330,69 +200,18 @@ public class DruidPlanner implements Closeable
 
   /**
    * Plan an SQL query for execution, returning a {@link PlannerResult} which can be used to actually execute the query.
-   *
-   * Ideally, the query can be planned into a native Druid query, using {@link #planWithDruidConvention}, but will
-   * fall-back to {@link #planWithBindableConvention} if this is not possible.
-   *
-   * Planning reuses the validation done in `validate()` which must be called first.
+   * <p>
+   * Ideally, the query can be planned into a native Druid query, but will
+   * fall-back to bindable convention if this is not possible.
+   * <p>
+   * Planning reuses the validation done in {@code validate()} which must be called first.
    */
   public PlannerResult plan() throws ValidationException
   {
     Preconditions.checkState(state == State.VALIDATED || state == State.PREPARED);
     Preconditions.checkState(authorized);
-    if (state == State.VALIDATED) {
-      rootQueryRel = planner.rel(validatedQueryNode);
-    }
-
-    final Set<RelOptTable> bindableTables = getBindableTables(rootQueryRel.rel);
-
-    // the planner's type factory is not available until after parsing
-    this.rexBuilder = new RexBuilder(planner.getTypeFactory());
     state = State.PLANNED;
-
-    try {
-      if (!bindableTables.isEmpty()) {
-        // Consider BINDABLE convention when necessary. Used for metadata tables.
-
-        if (parsed.isInsert() || parsed.isReplace()) {
-          // Throws ValidationException if the target table is itself bindable.
-          validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
-        }
-
-        if (!plannerContext.engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) {
-          throw new ValidationException(
-              StringUtils.format(
-                  "Cannot query table%s [%s] with SQL engine '%s'.",
-                  bindableTables.size() != 1 ? "s" : "",
-                  bindableTables.stream()
-                                .map(table -> Joiner.on(".").join(table.getQualifiedName()))
-                                .collect(Collectors.joining(", ")),
-                  engine.name()
-              )
-          );
-        }
-
-        return planWithBindableConvention(rootQueryRel, parsed.getExplainNode());
-      } else {
-        // DRUID convention is used whenever there are no tables that require BINDABLE.
-        return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertOrReplace());
-      }
-    }
-    catch (Exception e) {
-      Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
-      if (null == cannotPlanException) {
-        // Not a CannotPlanException, rethrow without logging.
-        throw e;
-      }
-
-      Logger logger = log;
-      if (!plannerContext.getQueryContext().isDebug()) {
-        logger = log.noStackTrace();
-      }
-      String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException);
-      logger.warn(e, errorMessage);
-      throw new UnsupportedSQLQueryException(errorMessage);
-    }
+    return handler.plan();
   }
 
   public PlannerContext getPlannerContext()
@@ -402,7 +221,7 @@ public class DruidPlanner implements Closeable
 
   public PrepareResult prepareResult()
   {
-    return prepareResult;
+    return handler.prepareResult();
   }
 
   @Override
@@ -411,647 +230,48 @@ public class DruidPlanner implements Closeable
     planner.close();
   }
 
-  /**
-   * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query.
-   */
-  private PlannerResult planWithDruidConvention(
-      final RelRoot root,
-      @Nullable final SqlExplain explain,
-      @Nullable final SqlInsert insertOrReplace
-  ) throws ValidationException
+  protected class HandlerContextImpl implements SqlStatementHandler.HandlerContext
   {
-    final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(root);
-    final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot, insertOrReplace);
-    plannerContext.setQueryMaker(queryMaker);
-    if (prepareResult == null) {
-      doPrepare();
-    }
-
-    // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
-    // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to
-    // successfully substitute all parameter values, and will cause a failure if any
-    // dynamic a parameters are not bound. This occurs at least for DATE parameters
-    // with integer values.
-    //
-    // This check also catches the case where we did not do a parameter check earlier
-    // because no values were provided. (Values are not required in the PREPARE case
-    // but now that we're planning, we require them.)
-    RelNode parameterized = possiblyLimitedRoot.rel.accept(
-        new RelParameterizerShuttle(plannerContext)
-    );
-    final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
-        CalciteRulesManager.DRUID_CONVENTION_RULES,
-        planner.getEmptyTraitSet()
-               .replace(DruidConvention.instance())
-               .plus(root.collation),
-        parameterized
-    );
-
-    if (explain != null) {
-      return planExplanation(druidRel, explain, true);
-    } else {
-      // Compute row type.
-      final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
-      final RelDataType rowType;
-
-      if (parsed.isSelect()) {
-        rowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType);
-      } else {
-        assert parsed.insertOrReplace != null;
-        rowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType);
-      }
-
-      // Start the query.
-      final Supplier<QueryResponse> resultsSupplier = () -> {
-        // sanity check
-        final Set<ResourceAction> readResourceActions =
-            plannerContext.getResourceActions()
-                          .stream()
-                          .filter(action -> action.getAction() == Action.READ)
-                          .collect(Collectors.toSet());
-        Preconditions.checkState(
-            readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty()
-            // The resources found in the plannerContext can be less than the datasources in
-            // the query plan, because the query planner can eliminate empty tables by replacing
-            // them with InlineDataSource of empty rows.
-            || readResourceActions.size() >= druidRel.getDataSourceNames().size(),
-            "Authorization sanity check failed"
-        );
-
-        return druidRel.runQuery();
-      };
-
-      return new PlannerResult(resultsSupplier, rowType);
-    }
-  }
-
-  /**
-   * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for
-   * things that are not directly translatable to native Druid queries such
-   * as system tables and just a general purpose (but definitely not optimized)
-   * fall-back.
-   *
-   * See {@link #planWithDruidConvention} which will handle things which are
-   * directly translatable to native Druid queries.
-   *
-   * The bindable path handles parameter substitution of any values not
-   * bound by the earlier steps.
-   */
-  private PlannerResult planWithBindableConvention(
-      final RelRoot root,
-      @Nullable final SqlExplain explain
-  )
-  {
-    if (prepareResult == null) {
-      doPrepare();
-    }
-
-    BindableRel bindableRel = (BindableRel) planner.transform(
-        CalciteRulesManager.BINDABLE_CONVENTION_RULES,
-        planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation),
-        root.rel
-    );
-
-    if (!root.isRefTrivial()) {
-      // Add a projection on top to accommodate root.fields.
-      final List<RexNode> projects = new ArrayList<>();
-      final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
-      for (int field : Pair.left(root.fields)) {
-        projects.add(rexBuilder.makeInputRef(bindableRel, field));
-      }
-      bindableRel = new Bindables.BindableProject(
-          bindableRel.getCluster(),
-          bindableRel.getTraitSet(),
-          bindableRel,
-          projects,
-          root.validatedRowType
-      );
-    }
-
-    if (explain != null) {
-      return planExplanation(bindableRel, explain, false);
-    } else {
-      final BindableRel theRel = bindableRel;
-      final DataContext dataContext = plannerContext.createDataContext(
-              planner.getTypeFactory(),
-              plannerContext.getParameters()
-      );
-      final Supplier<QueryResponse> resultsSupplier = () -> {
-        final Enumerable<?> enumerable = theRel.bind(dataContext);
-        final Enumerator<?> enumerator = enumerable.enumerator();
-        return QueryResponse.withEmptyContext(Sequences.withBaggage(
-            new BaseSequence<>(
-                new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
-                {
-                  @Override
-                  public EnumeratorIterator<Object[]> make()
-                  {
-                    return new EnumeratorIterator<>(new Iterator<Object[]>()
-                    {
-                      @Override
-                      public boolean hasNext()
-                      {
-                        return enumerator.moveNext();
-                      }
-
-                      @Override
-                      public Object[] next()
-                      {
-                        return (Object[]) enumerator.current();
-                      }
-                    });
-                  }
-
-                  @Override
-                  public void cleanup(EnumeratorIterator<Object[]> iterFromMake)
-                  {
-
-                  }
-                }
-            ), enumerator::close)
-        );
-      };
-      return new PlannerResult(resultsSupplier, root.validatedRowType);
-    }
-  }
-
-  /**
-   * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode}
-   */
-  private PlannerResult planExplanation(
-      final RelNode rel,
-      final SqlExplain explain,
-      final boolean isDruidConventionExplanation
-  )
-  {
-    String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
-    String resourcesString;
-    try {
-      if (isDruidConventionExplanation && rel instanceof DruidRel) {
-        // Show the native queries instead of Calcite's explain if the legacy flag is turned off
-        if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
-          DruidRel<?> druidRel = (DruidRel<?>) rel;
-          try {
-            explanation = explainSqlPlanAsNativeQueries(druidRel);
-          }
-          catch (Exception ex) {
-            log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan");
-          }
-        }
-      }
-      final Set<Resource> resources =
-          plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet());
-      resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources);
-    }
-    catch (JsonProcessingException jpe) {
-      // this should never happen, we create the Resources here, not a user
-      log.error(jpe, "Encountered exception while serializing Resources for explain output");
-      resourcesString = null;
-    }
-    final Supplier<QueryResponse> resultsSupplier = Suppliers.ofInstance(
-        QueryResponse.withEmptyContext(Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})))
-    );
-    return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
-  }
-
-  /**
-   * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose
-   * and not indicative of the native Druid Queries which will get executed
-   * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it
-   *
-   * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it
-   * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
-   * @throws JsonProcessingException
-   */
-  private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
-  {
-    ObjectMapper jsonMapper = plannerContext.getJsonMapper();
-    List<DruidQuery> druidQueryList;
-    druidQueryList = flattenOutermostRel(rel)
-        .stream()
-        .map(druidRel -> druidRel.toDruidQuery(false))
-        .collect(Collectors.toList());
-
-
-    // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when
-    // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then
-    // serializing it using normal list method.
-    ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode();
-
-    for (DruidQuery druidQuery : druidQueryList) {
-      Query<?> nativeQuery = druidQuery.getQuery();
-      ObjectNode objectNode = jsonMapper.createObjectNode();
-      objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
-      objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
-      nativeQueriesArrayNode.add(objectNode);
-    }
-
-    return jsonMapper.writeValueAsString(nativeQueriesArrayNode);
-  }
-
-  /**
-   * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel}
-   * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel}
-   * node
-   * For eg, a DruidRel structure of kind:
-   * DruidUnionRel
-   *  DruidUnionRel
-   *    DruidRel (A)
-   *    DruidRel (B)
-   *  DruidRel(C)
-   * will return [DruidRel(A), DruidRel(B), DruidRel(C)]
-   *
-   * @param outermostDruidRel The outermost rel which is to be flattened
-   * @return a list of DruidRel's which donot have a DruidUnionRel nested in between them
-   */
-  private List<DruidRel<?>> flattenOutermostRel(DruidRel<?> outermostDruidRel)
-  {
-    List<DruidRel<?>> druidRels = new ArrayList<>();
-    flattenOutermostRel(outermostDruidRel, druidRels);
-    return druidRels;
-  }
-
-  /**
-   * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
-   * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
-   * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
-   *
-   * @param druidRel                The current relNode
-   * @param flattendListAccumulator Accumulator list which needs to be appended by this method
-   */
-  private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
-  {
-    if (druidRel instanceof DruidUnionRel) {
-      DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
-      druidUnionRel.getInputs().forEach(innerRelNode -> {
-        DruidRel<?> innerDruidRelNode = (DruidRel<?>) innerRelNode; // This type conversion should always be possible
-        flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
-      });
-    } else {
-      flattendListAccumulator.add(druidRel);
-    }
-  }
-
-  /**
-   * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
-   * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
-   * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
-   *
-   * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
-   * the web console, allowing it to apply a limit to queries without rewriting the original SQL.
-   *
-   * @param root root node
-   *
-   * @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
-   */
-  @Nullable
-  private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
-  {
-    Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
-    Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
-    if (outerLimit == null) {
-      return root;
-    }
-
-    final LogicalSort newRootRel;
-
-    if (root.rel instanceof Sort) {
-      Sort sort = (Sort) root.rel;
-
-      final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
-      final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
-
-      if (newOffsetLimit.equals(originalOffsetLimit)) {
-        // nothing to do, don't bother to make a new sort
-        return root;
-      }
-
-      newRootRel = LogicalSort.create(
-          sort.getInput(),
-          sort.collation,
-          newOffsetLimit.getOffsetAsRexNode(rexBuilder),
-          newOffsetLimit.getLimitAsRexNode(rexBuilder)
-      );
-    } else {
-      newRootRel = LogicalSort.create(
-          root.rel,
-          root.collation,
-          null,
-          new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
-      );
-    }
-
-    return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation);
-  }
-
-  private QueryMaker buildQueryMaker(
-      final RelRoot rootQueryRel,
-      @Nullable final SqlInsert insertOrReplace
-  ) throws ValidationException
-  {
-    if (insertOrReplace != null) {
-      final String targetDataSource = validateAndGetDataSourceForIngest(insertOrReplace);
-      validateColumnsForIngestion(rootQueryRel);
-      return engine.buildQueryMakerForInsert(targetDataSource, rootQueryRel, plannerContext);
-    } else {
-      return engine.buildQueryMakerForSelect(rootQueryRel, plannerContext);
-    }
-  }
-
-  private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
-  {
-    return typeFactory.createStructType(
-        ImmutableList.of(
-            Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR),
-            Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)
-        ),
-        ImmutableList.of("PLAN", "RESOURCES")
-    );
-  }
-
-  /**
-   * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support.
-   * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema.
-   */
-  private String validateAndGetDataSourceForIngest(final SqlInsert insert) throws ValidationException
-  {
-    final String operatorName = insert.getOperator().getName();
-    if (insert.isUpsert()) {
-      throw new ValidationException("UPSERT is not supported.");
-    }
-
-    if (insert.getTargetColumnList() != null) {
-      throw new ValidationException(operatorName + " with target column list is not supported.");
-    }
-
-    final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
-    final String dataSource;
-
-    if (tableIdentifier.names.isEmpty()) {
-      // I don't think this can happen, but include a branch for it just in case.
-      throw new ValidationException(operatorName + " requires target table.");
-    } else if (tableIdentifier.names.size() == 1) {
-      // Unqualified name.
-      dataSource = Iterables.getOnlyElement(tableIdentifier.names);
-    } else {
-      // Qualified name.
-      final String defaultSchemaName =
-          Iterables.getOnlyElement(CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null));
-
-      if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) {
-        dataSource = tableIdentifier.names.get(1);
-      } else {
-        throw new ValidationException(
-            StringUtils.format(
-                "Cannot %s into [%s] because it is not a Druid datasource (schema = %s).",
-                operatorName,
-                tableIdentifier,
-                defaultSchemaName
-            )
-        );
-      }
-    }
-
-    try {
-      IdUtils.validateId(operatorName + " dataSource", dataSource);
-    }
-    catch (IllegalArgumentException e) {
-      throw new ValidationException(e.getMessage());
-    }
-
-    return dataSource;
-  }
-
-  private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException
-  {
-    // Check that there are no unnamed columns in the insert.
-    for (Pair<Integer, String> field : rootQueryRel.fields) {
-      if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
-        throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR);
-      }
-    }
-  }
-
-  private String buildSQLPlanningErrorMessage(Throwable exception)
-  {
-    String errorMessage = plannerContext.getPlanningError();
-    if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) {
-      errorMessage = exception.getMessage();
-    }
-    if (null == errorMessage) {
-      errorMessage = "Please check Broker logs for additional details.";
-    } else {
-      // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong.
-      errorMessage = "Possible error: " + errorMessage;
-    }
-    // Finally, add the query itself to error message that user will get.
-    return StringUtils.format("Query not supported. %s SQL was: %s", errorMessage, plannerContext.getSql());
-  }
-
-  private static Set<RelOptTable> getBindableTables(final RelNode relNode)
-  {
-    class HasBindableVisitor extends RelVisitor
-    {
-      private final Set<RelOptTable> found = new HashSet<>();
-
-      @Override
-      public void visit(RelNode node, int ordinal, RelNode parent)
-      {
-        if (node instanceof TableScan) {
-          RelOptTable table = node.getTable();
-          if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
-            found.add(table);
-            return;
-          }
-        }
-
-        super.visit(node, ordinal, parent);
-      }
-    }
-
-    final HasBindableVisitor visitor = new HasBindableVisitor();
-    visitor.go(relNode);
-    return visitor.found;
-  }
-
-  private static class EnumeratorIterator<T> implements Iterator<T>
-  {
-    private final Iterator<T> it;
-
-    EnumeratorIterator(Iterator<T> it)
-    {
-      this.it = it;
-    }
-
     @Override
-    public boolean hasNext()
+    public PlannerContext plannerContext()
     {
-      return it.hasNext();
+      return plannerContext;
     }
 
     @Override
-    public T next()
-    {
-      return it.next();
-    }
-  }
-
-  private static class ParsedNodes
-  {
-    @Nullable
-    private final SqlExplain explain;
-
-    @Nullable
-    private final SqlInsert insertOrReplace;
-
-    private final SqlNode query;
-
-    @Nullable
-    private final Granularity ingestionGranularity;
-
-    @Nullable
-    private final List<String> replaceIntervals;
-
-    private ParsedNodes(
-        @Nullable SqlExplain explain,
-        @Nullable SqlInsert insertOrReplace,
-        SqlNode query,
-        @Nullable Granularity ingestionGranularity,
-        @Nullable List<String> replaceIntervals
-    )
-    {
-      this.explain = explain;
-      this.insertOrReplace = insertOrReplace;
-      this.query = query;
-      this.ingestionGranularity = ingestionGranularity;
-      this.replaceIntervals = replaceIntervals;
-    }
-
-    static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) throws ValidationException
+    public SqlEngine engine()
     {
-      SqlNode query = node;
-      SqlExplain explain = null;
-      if (query.getKind() == SqlKind.EXPLAIN) {
-        explain = (SqlExplain) query;
-        query = explain.getExplicandum();
-      }
-
-      if (query.getKind() == SqlKind.INSERT) {
-        if (query instanceof DruidSqlInsert) {
-          return handleInsert(explain, (DruidSqlInsert) query);
-        } else if (query instanceof DruidSqlReplace) {
-          return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone);
-        }
-      }
-
-      if (!query.isA(SqlKind.QUERY)) {
-        throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
-      }
-
-      return new ParsedNodes(explain, null, query, null, null);
+      return engine;
     }
 
-    static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert druidSqlInsert) throws ValidationException
-    {
-      SqlNode query = druidSqlInsert.getSource();
-
-      // Check if ORDER BY clause is not provided to the underlying query
-      if (query instanceof SqlOrderBy) {
-        SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-        SqlNodeList orderByList = sqlOrderBy.orderList;
-        if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
-          throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.");
-        }
-      }
-
-      Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
-
-      if (druidSqlInsert.getClusteredBy() != null) {
-        query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlInsert.getClusteredBy());
-      }
-
-      if (!query.isA(SqlKind.QUERY)) {
-        throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
-      }
-
-      return new ParsedNodes(explain, druidSqlInsert, query, ingestionGranularity, null);
-    }
-
-    static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace druidSqlReplace, DateTimeZone dateTimeZone)
-        throws ValidationException
-    {
-      SqlNode query = druidSqlReplace.getSource();
-
-      // Check if ORDER BY clause is not provided to the underlying query
-      if (query instanceof SqlOrderBy) {
-        SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-        SqlNodeList orderByList = sqlOrderBy.orderList;
-        if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
-          throw new ValidationException("Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.");
-        }
-      }
-
-      SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery();
-      if (replaceTimeQuery == null) {
-        throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.");
-      }
-
-      Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
-      List<String> replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone);
-
-      if (druidSqlReplace.getClusteredBy() != null) {
-        query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlReplace.getClusteredBy());
-      }
-
-      if (!query.isA(SqlKind.QUERY)) {
-        throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
-      }
-
-      return new ParsedNodes(explain, druidSqlReplace, query, ingestionGranularity, replaceIntervals);
-    }
-
-    @Nullable
-    public SqlExplain getExplainNode()
-    {
-      return explain;
-    }
-
-    public boolean isSelect()
-    {
-      return insertOrReplace == null;
-    }
-
-    public boolean isInsert()
-    {
-      return insertOrReplace != null && !isReplace();
-    }
-
-    public boolean isReplace()
+    @Override
+    public CalcitePlanner planner()
     {
-      return insertOrReplace instanceof DruidSqlReplace;
+      return planner;
     }
 
-    @Nullable
-    public SqlInsert getInsertOrReplace()
+    @Override
+    public QueryContext queryContext()
     {
-      return insertOrReplace;
+      return plannerContext.getQueryContext();
     }
 
-    @Nullable
-    public List<String> getReplaceIntervals()
+    @Override
+    public SchemaPlus defaultSchema()
     {
-      return replaceIntervals;
+      return frameworkConfig.getDefaultSchema();
     }
 
-    public SqlNode getQueryNode()
+    @Override
+    public ObjectMapper jsonMapper()
     {
-      return query;
+      return plannerContext.getJsonMapper();
     }
 
-    @Nullable
-    public Granularity getIngestionGranularity()
+    @Override
+    public DateTimeZone timeZone()
     {
-      return ingestionGranularity;
+      return plannerContext.getTimeZone();
     }
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
new file mode 100644
index 0000000000..e59b8cf5e7
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.Pair;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
+import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.run.QueryMaker;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public abstract class IngestHandler extends QueryHandler
+{
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+  @VisibleForTesting
+  public static final String UNNAMED_INGESTION_COLUMN_ERROR =
+      "Cannot ingest expressions that do not have an alias "
+          + "or columns with names like EXPR$[digit].\n"
+          + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as "
+          + "\"func(X) as myColumn\"";
+
+  protected final Granularity ingestionGranularity;
+  protected String targetDatasource;
+
+  IngestHandler(
+      HandlerContext handlerContext,
+      DruidSqlIngest ingestNode,
+      SqlNode queryNode,
+      SqlExplain explain
+  )
+  {
+    super(handlerContext, queryNode, explain);
+    this.ingestionGranularity = ingestNode.getPartitionedBy();
+  }
+
+  protected static SqlNode convertQuery(DruidSqlIngest sqlNode) throws ValidationException
+  {
+    SqlNode query = sqlNode.getSource();
+
+    // Check if ORDER BY clause is not provided to the underlying query
+    if (query instanceof SqlOrderBy) {
+      SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+      SqlNodeList orderByList = sqlOrderBy.orderList;
+      if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
+        String opName = sqlNode.getOperator().getName();
+        throw new ValidationException(StringUtils.format(
+            "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+            "INSERT".equals(opName) ? "an" : "a",
+            opName
+        ));
+      }
+    }
+    if (sqlNode.getClusteredBy() != null) {
+      query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy());
+    }
+
+    if (!query.isA(SqlKind.QUERY)) {
+      throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
+    }
+    return query;
+  }
+
+  protected String operationName()
+  {
+    return ingestNode().getOperator().getName();
+  }
+
+  protected abstract DruidSqlIngest ingestNode();
+
+  @Override
+  public void validate() throws ValidationException
+  {
+    if (ingestNode().getPartitionedBy() == null) {
+      throw new ValidationException(StringUtils.format(
+          "%s statements must specify PARTITIONED BY clause explicitly",
+          operationName()
+      ));
+    }
+    try {
+      PlannerContext plannerContext = handlerContext.plannerContext();
+      if (ingestionGranularity != null) {
+        plannerContext.getQueryContext().addSystemParam(
+            DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+            plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity)
+        );
+      }
+    }
+    catch (JsonProcessingException e) {
+      throw new ValidationException("Unable to serialize partition granularity.");
+    }
+    super.validate();
+    // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
+    // the number of rows inserted to be limited which is likely to be confusing and unintended.
+    if (handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
+      throw new ValidationException(
+          StringUtils.format(
+              "%s cannot be provided with %s.",
+              PlannerContext.CTX_SQL_OUTER_LIMIT,
+              operationName()
+          )
+      );
+    }
+    targetDatasource = validateAndGetDataSourceForIngest();
+    resourceActions.add(new ResourceAction(new Resource(targetDatasource, ResourceType.DATASOURCE), Action.WRITE));
+  }
+
+  @Override
+  protected RelDataType returnedRowType()
+  {
+    final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+    return handlerContext.engine().resultTypeForInsert(
+        typeFactory,
+        rootQueryRel.validatedRowType);
+  }
+
+  /**
+   * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support.
+   * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema.
+   */
+  private String validateAndGetDataSourceForIngest() throws ValidationException
+  {
+    final SqlInsert insert = ingestNode();
+    if (insert.isUpsert()) {
+      throw new ValidationException("UPSERT is not supported.");
+    }
+
+    if (insert.getTargetColumnList() != null) {
+      throw new ValidationException(operationName() + " with a target column list is not supported.");
+    }
+
+    final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
+    final String dataSource;
+
+    if (tableIdentifier.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new ValidationException(operationName() + " requires a target table.");
+    } else if (tableIdentifier.names.size() == 1) {
+      // Unqualified name.
+      dataSource = Iterables.getOnlyElement(tableIdentifier.names);
+    } else {
+      // Qualified name.
+      final String defaultSchemaName =
+          Iterables.getOnlyElement(CalciteSchema.from(handlerContext.defaultSchema()).path(null));
+
+      if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) {
+        dataSource = tableIdentifier.names.get(1);
+      } else {
+        throw new ValidationException(
+            StringUtils.format(
+                "Cannot %s into %s because it is not a Druid datasource.",
+                operationName(),
+                tableIdentifier
+            )
+        );
+      }
+    }
+
+    try {
+      IdUtils.validateId(operationName() + " dataSource", dataSource);
+    }
+    catch (IllegalArgumentException e) {
+      throw new ValidationException(e.getMessage());
+    }
+
+    return dataSource;
+  }
+
+  @Override
+  protected PlannerResult planForDruid() throws ValidationException
+  {
+    return planWithDruidConvention();
+  }
+
+  @Override
+  protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
+  {
+    validateColumnsForIngestion(rootQueryRel);
+    return handlerContext.engine().buildQueryMakerForInsert(
+        targetDatasource,
+        rootQueryRel,
+        handlerContext.plannerContext());
+  }
+
+  private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException
+  {
+    // Check that there are no unnamed columns in the insert.
+    for (Pair<Integer, String> field : rootQueryRel.fields) {
+      if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
+        throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR);
+      }
+    }
+  }
+
+  /**
+   * Handler for the INSERT statement.
+   */
+  protected static class InsertHandler extends IngestHandler
+  {
+    private final DruidSqlInsert sqlNode;
+
+    public InsertHandler(
+        SqlStatementHandler.HandlerContext handlerContext,
+        DruidSqlInsert sqlNode,
+        SqlExplain explain
+    ) throws ValidationException
+    {
+      super(
+          handlerContext,
+          sqlNode,
+          convertQuery(sqlNode),
+          explain);
+      this.sqlNode = sqlNode;
+    }
+
+    @Override
+    public SqlNode sqlNode()
+    {
+      return sqlNode;
+    }
+
+    @Override
+    protected DruidSqlIngest ingestNode()
+    {
+      return sqlNode;
+    }
+
+    @Override
+    public void validate() throws ValidationException
+    {
+      if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_INSERT)) {
+        throw new ValidationException(StringUtils.format(
+            "Cannot execute INSERT with SQL engine '%s'.",
+            handlerContext.engine().name())
+        );
+      }
+      super.validate();
+    }
+  }
+
+  /**
+   * Handler for the REPLACE statement.
+   */
+  protected static class ReplaceHandler extends IngestHandler
+  {
+    private final DruidSqlReplace sqlNode;
+    private List<String> replaceIntervals;
+
+    public ReplaceHandler(
+        SqlStatementHandler.HandlerContext handlerContext,
+        DruidSqlReplace sqlNode,
+        SqlExplain explain
+    ) throws ValidationException
+    {
+      super(
+          handlerContext,
+          sqlNode,
+          convertQuery(sqlNode),
+          explain
+      );
+      this.sqlNode = sqlNode;
+    }
+
+    @Override
+    public SqlNode sqlNode()
+    {
+      return sqlNode;
+    }
+
+    @Override
+    protected DruidSqlIngest ingestNode()
+    {
+      return sqlNode;
+    }
+
+    @Override
+    public void validate() throws ValidationException
+    {
+      if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_REPLACE)) {
+        throw new ValidationException(StringUtils.format(
+            "Cannot execute REPLACE with SQL engine '%s'.",
+            handlerContext.engine().name())
+        );
+      }
+      SqlNode replaceTimeQuery = sqlNode.getReplaceTimeQuery();
+      if (replaceTimeQuery == null) {
+        throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE. Use "
+            + "OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.");
+      }
+
+      replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(
+          replaceTimeQuery,
+          ingestionGranularity,
+          handlerContext.timeZone());
+      super.validate();
+      if (replaceIntervals != null) {
+        handlerContext.queryContext().addSystemParam(
+            DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
+            String.join(",", replaceIntervals)
+        );
+      }
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
index 57e3ba2cbc..cbb07ddb48 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
@@ -32,12 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class PlannerResult
 {
-  private final Supplier<QueryResponse> resultsSupplier;
+  private final Supplier<QueryResponse<Object[]>> resultsSupplier;
   private final RelDataType rowType;
   private final AtomicBoolean didRun = new AtomicBoolean();
 
   public PlannerResult(
-      final Supplier<QueryResponse> resultsSupplier,
+      final Supplier<QueryResponse<Object[]>> resultsSupplier,
       final RelDataType rowType
   )
   {
@@ -53,7 +53,7 @@ public class PlannerResult
   /**
    * Run the query
    */
-  public QueryResponse run()
+  public QueryResponse<Object[]> run()
   {
     if (!didRun.compareAndSet(false, true)) {
       // Safety check.
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
new file mode 100644
index 0000000000..1d6a71b542
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.BindableConvention;
+import org.apache.calcite.interpreter.BindableRel;
+import org.apache.calcite.interpreter.Bindables;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.rel.DruidConvention;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.rel.DruidRel;
+import org.apache.druid.sql.calcite.rel.DruidUnionRel;
+import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.run.QueryMaker;
+import org.apache.druid.sql.calcite.table.DruidTable;
+import org.apache.druid.utils.Throwables;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract base class for handlers that revolve around queries: SELECT,
+ * INSERT and REPLACE. This class handles the common SELECT portion of the statement.
+ */
+public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHandler
+{
+  static final EmittingLogger log = new EmittingLogger(QueryHandler.class);
+
+  protected SqlNode queryNode;
+  protected SqlExplain explain;
+  protected SqlNode validatedQueryNode;
+  private boolean isPrepared;
+  protected RelRoot rootQueryRel;
+  private PrepareResult prepareResult;
+  protected RexBuilder rexBuilder;
+
+  public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain)
+  {
+    super(handlerContext);
+    this.queryNode = sqlNode;
+    this.explain = explain;
+  }
+
+  @Override
+  public void validate() throws ValidationException
+  {
+    CalcitePlanner planner = handlerContext.planner();
+    validatedQueryNode = planner.validate(rewriteParameters());
+
+    final SqlValidator validator = planner.getValidator();
+    SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(
+        validator,
+        handlerContext.plannerContext()
+    );
+    validatedQueryNode.accept(resourceCollectorShuttle);
+    resourceActions = resourceCollectorShuttle.getResourceActions();
+  }
+
+  private SqlNode rewriteParameters()
+  {
+    // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
+    // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
+    // replacement.
+    //
+    // Parameter replacement is done only if the client provides parameter values.
+    // If this is a PREPARE-only, then there will be no values even if the statement contains
+    // parameters. If this is a PLAN, then we'll catch later the case that the statement
+    // contains parameters, but no values were provided.
+    PlannerContext plannerContext = handlerContext.plannerContext();
+    if (plannerContext.getParameters().isEmpty()) {
+      return queryNode;
+    } else {
+      return queryNode.accept(new SqlParameterizerShuttle(plannerContext));
+    }
+  }
+
+  @Override
+  public void prepare()
+  {
+    if (isPrepared) {
+      return;
+    }
+    isPrepared = true;
+    rootQueryRel = handlerContext.planner().rel(validatedQueryNode);
+    final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+    final SqlValidator validator = handlerContext.planner().getValidator();
+    final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode);
+    final RelDataType returnedRowType;
+
+    if (explain != null) {
+      returnedRowType = getExplainStructType(typeFactory);
+    } else {
+      returnedRowType = returnedRowType();
+    }
+
+    prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes);
+  }
+
+  @Override
+  public PrepareResult prepareResult()
+  {
+    return prepareResult;
+  }
+
+  protected abstract RelDataType returnedRowType();
+
+  private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
+  {
+    return typeFactory.createStructType(
+        ImmutableList.of(
+            Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR),
+            Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)
+        ),
+        ImmutableList.of("PLAN", "RESOURCES")
+    );
+  }
+
+  @Override
+  public PlannerResult plan() throws ValidationException
+  {
+    prepare();
+    final Set<RelOptTable> bindableTables = getBindableTables(rootQueryRel.rel);
+
+    // the planner's type factory is not available until after parsing
+    rexBuilder = new RexBuilder(handlerContext.planner().getTypeFactory());
+
+    try {
+      if (!bindableTables.isEmpty()) {
+        // Consider BINDABLE convention when necessary. Used for metadata tables.
+
+        if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) {
+          throw new ValidationException(
+              StringUtils.format(
+                  "Cannot query table%s %s with SQL engine '%s'.",
+                  bindableTables.size() != 1 ? "s" : "",
+                  bindableTables.stream()
+                                .map(table -> Joiner.on(".").join(table.getQualifiedName()))
+                                .collect(Collectors.joining(", ")),
+                  handlerContext.engine().name()
+              )
+          );
+        }
+
+        return planWithBindableConvention();
+      } else {
+        // Druid convention is used whenever there are no tables that require BINDABLE.
+        return planForDruid();
+      }
+    }
+    catch (Exception e) {
+      Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
+      if (null == cannotPlanException) {
+        // Not a CannotPlanException, rethrow without logging.
+        throw e;
+      }
+
+      Logger logger = log;
+      if (!handlerContext.queryContext().isDebug()) {
+        logger = log.noStackTrace();
+      }
+      String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException);
+      logger.warn(e, errorMessage);
+      throw new UnsupportedSQLQueryException(errorMessage);
+    }
+  }
+
+  private static Set<RelOptTable> getBindableTables(final RelNode relNode)
+  {
+    class HasBindableVisitor extends RelVisitor
+    {
+      private final Set<RelOptTable> found = new HashSet<>();
+
+      @Override
+      public void visit(RelNode node, int ordinal, RelNode parent)
+      {
+        if (node instanceof TableScan) {
+          RelOptTable table = node.getTable();
+          if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
+            found.add(table);
+            return;
+          }
+        }
+
+        super.visit(node, ordinal, parent);
+      }
+    }
+
+    final HasBindableVisitor visitor = new HasBindableVisitor();
+    visitor.go(relNode);
+    return visitor.found;
+  }
+
+  /**
+   * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for
+   * things that are not directly translatable to native Druid queries such
+   * as system tables and just a general purpose (but definitely not optimized)
+   * fall-back.
+   *
+   * See {@link #planWithDruidConvention} which will handle things which are
+   * directly translatable to native Druid queries.
+   *
+   * The bindable path handles parameter substitution of any values not
+   * bound by the earlier steps.
+   */
+  private PlannerResult planWithBindableConvention()
+  {
+    CalcitePlanner planner = handlerContext.planner();
+    BindableRel bindableRel = (BindableRel) planner.transform(
+        CalciteRulesManager.BINDABLE_CONVENTION_RULES,
+        planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(rootQueryRel.collation),
+        rootQueryRel.rel
+    );
+
+    if (!rootQueryRel.isRefTrivial()) {
+      // Add a projection on top to accommodate root.fields.
+      final List<RexNode> projects = new ArrayList<>();
+      final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
+      for (int field : Pair.left(rootQueryRel.fields)) {
+        projects.add(rexBuilder.makeInputRef(bindableRel, field));
+      }
+      bindableRel = new Bindables.BindableProject(
+          bindableRel.getCluster(),
+          bindableRel.getTraitSet(),
+          bindableRel,
+          projects,
+          rootQueryRel.validatedRowType
+      );
+    }
+
+    PlannerContext plannerContext = handlerContext.plannerContext();
+    if (explain != null) {
+      return planExplanation(bindableRel, false);
+    } else {
+      final BindableRel theRel = bindableRel;
+      final DataContext dataContext = plannerContext.createDataContext(
+              planner.getTypeFactory(),
+              plannerContext.getParameters()
+      );
+      final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> {
+        final Enumerable<?> enumerable = theRel.bind(dataContext);
+        final Enumerator<?> enumerator = enumerable.enumerator();
+        return QueryResponse.withEmptyContext(
+            Sequences.withBaggage(new BaseSequence<>(
+              new BaseSequence.IteratorMaker<Object[], QueryHandler.EnumeratorIterator<Object[]>>()
+              {
+                @Override
+                public QueryHandler.EnumeratorIterator<Object[]> make()
+                {
+                  return new QueryHandler.EnumeratorIterator<>(new Iterator<Object[]>()
+                  {
+                    @Override
+                    public boolean hasNext()
+                    {
+                      return enumerator.moveNext();
+                    }
+
+                    @Override
+                    public Object[] next()
+                    {
+                      return (Object[]) enumerator.current();
+                    }
+                  });
+                }
+
+                @Override
+                public void cleanup(QueryHandler.EnumeratorIterator<Object[]> iterFromMake)
+                {
+
+                }
+              }
+          ), enumerator::close)
+      );
+      };
+      return new PlannerResult(resultsSupplier, rootQueryRel.validatedRowType);
+    }
+  }
+
+  /**
+   * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode}
+   */
+  protected PlannerResult planExplanation(
+      final RelNode rel,
+      final boolean isDruidConventionExplanation
+  )
+  {
+    PlannerContext plannerContext = handlerContext.plannerContext();
+    String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
+    String resourcesString;
+    try {
+      if (isDruidConventionExplanation && rel instanceof DruidRel) {
+        // Show the native queries instead of Calcite's explain if the legacy flag is turned off
+        if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
+          DruidRel<?> druidRel = (DruidRel<?>) rel;
+          try {
+            explanation = explainSqlPlanAsNativeQueries(druidRel);
+          }
+          catch (Exception ex) {
+            log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan.");
+          }
+        }
+      }
+      final Set<Resource> resources =
+          plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet());
+      resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources);
+    }
+    catch (JsonProcessingException jpe) {
+      // this should never happen, we create the Resources here, not a user
+      log.error(jpe, "Encountered exception while serializing resources for explain output");
+      resourcesString = null;
+    }
+    final Supplier<QueryResponse<Object[]>> resultsSupplier = Suppliers.ofInstance(
+        QueryResponse.withEmptyContext(
+            Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))
+        )
+    );
+    return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
+  }
+
+  /**
+   * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose
+   * and not indicative of the native Druid Queries which will get executed
+   * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it
+   *
+   * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it
+   * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
+   * @throws JsonProcessingException
+   */
+  private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
+  {
+    ObjectMapper jsonMapper = handlerContext.jsonMapper();
+    List<DruidQuery> druidQueryList;
+    druidQueryList = flattenOutermostRel(rel)
+        .stream()
+        .map(druidRel -> druidRel.toDruidQuery(false))
+        .collect(Collectors.toList());
+
+
+    // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when
+    // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then
+    // serializing it using normal list method.
+    ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode();
+
+    for (DruidQuery druidQuery : druidQueryList) {
+      Query<?> nativeQuery = druidQuery.getQuery();
+      ObjectNode objectNode = jsonMapper.createObjectNode();
+      objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
+      objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
+      nativeQueriesArrayNode.add(objectNode);
+    }
+
+    return jsonMapper.writeValueAsString(nativeQueriesArrayNode);
+  }
+
+  /**
+   * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel}
+   * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel}
+   * node
+   * E.g. a DruidRel structure of kind:<pre><code>
+   * DruidUnionRel
+   *  DruidUnionRel
+   *    DruidRel (A)
+   *    DruidRel (B)
+   *  DruidRel(C)
+   * </code</pre>will return {@code [DruidRel(A), DruidRel(B), DruidRel(C)]}.
+   *
+   * @param outermostDruidRel The outermost rel which is to be flattened
+   * @return a list of DruidRel's which do not have a DruidUnionRel nested in between them
+   */
+  private List<DruidRel<?>> flattenOutermostRel(DruidRel<?> outermostDruidRel)
+  {
+    List<DruidRel<?>> druidRels = new ArrayList<>();
+    flattenOutermostRel(outermostDruidRel, druidRels);
+    return druidRels;
+  }
+
+  /**
+   * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
+   * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
+   * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
+   *
+   * @param druidRel                The current relNode
+   * @param flattendListAccumulator Accumulator list which needs to be appended by this method
+   */
+  private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
+  {
+    if (druidRel instanceof DruidUnionRel) {
+      DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
+      druidUnionRel.getInputs().forEach(innerRelNode -> {
+        DruidRel<?> innerDruidRelNode = (DruidRel<?>) innerRelNode; // This type conversion should always be possible
+        flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
+      });
+    } else {
+      flattendListAccumulator.add(druidRel);
+    }
+  }
+
+  protected abstract PlannerResult planForDruid() throws ValidationException;
+
+  /**
+   * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query.
+   */
+  protected PlannerResult planWithDruidConvention() throws ValidationException
+  {
+    final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(rootQueryRel);
+    final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot);
+    PlannerContext plannerContext = handlerContext.plannerContext();
+    plannerContext.setQueryMaker(queryMaker);
+
+    // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
+    // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to
+    // successfully substitute all parameter values, and will cause a failure if any
+    // dynamic a parameters are not bound. This occurs at least for DATE parameters
+    // with integer values.
+    //
+    // This check also catches the case where we did not do a parameter check earlier
+    // because no values were provided. (Values are not required in the PREPARE case
+    // but now that we're planning, we require them.)
+    RelNode parameterized = possiblyLimitedRoot.rel.accept(
+        new RelParameterizerShuttle(plannerContext)
+    );
+    CalcitePlanner planner = handlerContext.planner();
+    final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
+        CalciteRulesManager.DRUID_CONVENTION_RULES,
+        planner.getEmptyTraitSet()
+               .replace(DruidConvention.instance())
+               .plus(rootQueryRel.collation),
+        parameterized
+    );
+
+    if (explain != null) {
+      return planExplanation(druidRel, true);
+    } else {
+      // Compute row type.
+      final RelDataType rowType = prepareResult.getReturnedRowType();
+
+      // Start the query.
+      final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> {
+        // sanity check
+        final Set<ResourceAction> readResourceActions =
+            plannerContext.getResourceActions()
+                          .stream()
+                          .filter(action -> action.getAction() == Action.READ)
+                          .collect(Collectors.toSet());
+        Preconditions.checkState(
+            readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty()
+            // The resources found in the plannerContext can be less than the datasources in
+            // the query plan, because the query planner can eliminate empty tables by replacing
+            // them with InlineDataSource of empty rows.
+            || readResourceActions.size() >= druidRel.getDataSourceNames().size(),
+            "Authorization sanity check failed"
+        );
+
+        return druidRel.runQuery();
+      };
+
+      return new PlannerResult(resultsSupplier, rowType);
+    }
+  }
+
+  /**
+   * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
+   * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
+   * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
+   *
+   * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
+   * the web console, allowing it to apply a limit to queries without rewriting the original SQL.
+   *
+   * @param root root node
+   *
+   * @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
+   */
+  @Nullable
+  private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
+  {
+    Object outerLimitObj = handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
+    Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
+    if (outerLimit == null) {
+      return root;
+    }
+
+    final LogicalSort newRootRel;
+
+    if (root.rel instanceof Sort) {
+      Sort sort = (Sort) root.rel;
+
+      final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
+      final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
+
+      if (newOffsetLimit.equals(originalOffsetLimit)) {
+        // nothing to do, don't bother to make a new sort
+        return root;
+      }
+
+      newRootRel = LogicalSort.create(
+          sort.getInput(),
+          sort.collation,
+          newOffsetLimit.getOffsetAsRexNode(rexBuilder),
+          newOffsetLimit.getLimitAsRexNode(rexBuilder)
+      );
+    } else {
+      newRootRel = LogicalSort.create(
+          root.rel,
+          root.collation,
+          null,
+          new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
+      );
+    }
+
+    return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation);
+  }
+
+  protected abstract QueryMaker buildQueryMaker(RelRoot rootQueryRel) throws ValidationException;
+
+  private String buildSQLPlanningErrorMessage(Throwable exception)
+  {
+    String errorMessage = handlerContext.plannerContext().getPlanningError();
+    if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) {
+      errorMessage = exception.getMessage();
+    }
+    if (null == errorMessage) {
+      errorMessage = "Please check Broker logs for additional details.";
+    } else {
+      // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong.
+      errorMessage = "Possible error: " + errorMessage;
+    }
+    // Finally, add the query itself to error message that user will get.
+    return StringUtils.format(
+        "Query not supported. %s SQL was: %s", errorMessage,
+        handlerContext.plannerContext().getSql()
+    );
+  }
+
+  public static class SelectHandler extends QueryHandler
+  {
+    private final SqlNode sqlNode;
+
+    public SelectHandler(
+        HandlerContext handlerContext,
+        SqlNode sqlNode,
+        SqlExplain explain)
+    {
+      super(handlerContext, sqlNode, explain);
+      this.sqlNode = sqlNode;
+    }
+
+    @Override
+    public SqlNode sqlNode()
+    {
+      return sqlNode;
+    }
+
+    @Override
+    public void validate() throws ValidationException
+    {
+      if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_SELECT)) {
+        throw new ValidationException(StringUtils.format(
+            "Cannot execute SELECT with SQL engine '%s'.",
+            handlerContext.engine().name())
+        );
+      }
+      super.validate();
+    }
+
+    @Override
+    protected RelDataType returnedRowType()
+    {
+      final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+      return handlerContext.engine().resultTypeForSelect(
+          typeFactory,
+          rootQueryRel.validatedRowType
+      );
+    }
+
+    @Override
+    protected PlannerResult planForDruid() throws ValidationException
+    {
+      return planWithDruidConvention();
+    }
+
+    @Override
+    protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
+    {
+      return handlerContext.engine().buildQueryMakerForSelect(
+          rootQueryRel,
+          handlerContext.plannerContext());
+    }
+  }
+
+  private static class EnumeratorIterator<T> implements Iterator<T>
+  {
+    private final Iterator<T> it;
+
+    EnumeratorIterator(Iterator<T> it)
+    {
+      this.it = it;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return it.hasNext();
+    }
+
+    @Override
+    public T next()
+    {
+      return it.next();
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java
new file mode 100644
index 0000000000..fa8c4fdb17
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.joda.time.DateTimeZone;
+
+import java.util.Set;
+
+/**
+ * Handler for a SQL statement. Follows the same lifecycle as the planner,
+ * however this class handles one specific kind of SQL statement.
+ */
+public interface SqlStatementHandler
+{
+  SqlNode sqlNode();
+  void validate() throws ValidationException;
+  Set<ResourceAction> resourceActions();
+  void prepare();
+  PrepareResult prepareResult();
+  PlannerResult plan() throws ValidationException;
+
+  /**
+   * Context available to statement handlers.
+   */
+  interface HandlerContext
+  {
+    PlannerContext plannerContext();
+    SqlEngine engine();
+    CalcitePlanner planner();
+    QueryContext queryContext();
+    SchemaPlus defaultSchema();
+    ObjectMapper jsonMapper();
+    DateTimeZone timeZone();
+  }
+
+  abstract class BaseStatementHandler implements SqlStatementHandler
+  {
+    protected final HandlerContext handlerContext;
+    protected Set<ResourceAction> resourceActions;
+
+    protected BaseStatementHandler(HandlerContext handlerContext)
+    {
+      this.handlerContext = handlerContext;
+    }
+
+    @Override
+    public Set<ResourceAction> resourceActions()
+    {
+      return resourceActions;
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 185d96b945..fcf9fb754d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -133,7 +133,7 @@ public class DruidQuery
   @Nullable
   private final Sorting sorting;
 
-  private final Query query;
+  private final Query<?> query;
   private final RowSignature outputRowSignature;
   private final RelDataType outputRowType;
   private final VirtualColumnRegistry virtualColumnRegistry;
@@ -795,7 +795,7 @@ public class DruidQuery
     return outputRowSignature;
   }
 
-  public Query getQuery()
+  public Query<?> getQuery()
   {
     return query;
   }
@@ -806,7 +806,7 @@ public class DruidQuery
    *
    * @return Druid query
    */
-  private Query computeQuery()
+  private Query<?> computeQuery()
   {
     if (dataSource instanceof QueryDataSource) {
       // If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
index 9043577a7d..7bf305d42b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
@@ -28,7 +28,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
 import javax.annotation.Nullable;
 import java.util.Set;
 
-public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
+public abstract class DruidRel<T extends DruidRel<?>> extends AbstractRelNode
 {
   private final PlannerContext plannerContext;
 
@@ -45,7 +45,7 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
   @Nullable
   public abstract PartialDruidQuery getPartialDruidQuery();
 
-  public QueryResponse runQuery()
+  public QueryResponse<Object[]> runQuery()
   {
     // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
     // is the outermost query, and it will actually get run as a native query. Druid's native query layer will
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
index de1bc8b758..f754fc0cf0 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
@@ -107,11 +107,11 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
 
   @Override
   @SuppressWarnings({"unchecked", "rawtypes"})
-  public QueryResponse runQuery()
+  public QueryResponse<Object[]> runQuery()
   {
     // Lazy: run each query in sequence, not all at once.
     if (limit == 0) {
-      return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
+      return new QueryResponse<Object[]>(Sequences.empty(), ResponseContext.createEmpty());
     } else {
 
       // We run the first rel here for two reasons:
@@ -122,10 +122,10 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
       //    is also sub-optimal as it would consume parallel query resources and potentially starve the system.
       //    Instead, we only return the headers from the first query and potentially exception out and fail the query
       //    if there are any response headers that come from subsequent queries that are correctness concerns
-      final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
+      final QueryResponse<Object[]> queryResponse = ((DruidRel) rels.get(0)).runQuery();
 
-      final List<Sequence<Object>> firstAsList = Collections.singletonList(queryResponse.getResults());
-      final Iterable<Sequence<Object>> theRestTransformed = FluentIterable
+      final List<Sequence<Object[]>> firstAsList = Collections.singletonList(queryResponse.getResults());
+      final Iterable<Sequence<Object[]>> theRestTransformed = FluentIterable
           .from(rels.subList(1, rels.size()))
           .transform(
               rel -> {
@@ -144,10 +144,10 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
               }
           );
 
-      final Iterable<Sequence<Object>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
+      final Iterable<Sequence<Object[]>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
 
       final Sequence returnSequence = Sequences.concat(recombinedSequences);
-      return new QueryResponse(
+      return new QueryResponse<Object[]>(
           limit > 0 ? returnSequence.limit(limit) : returnSequence,
           queryResponse.getResponseContext()
       );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index f045769ec9..27a5462ba7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -94,7 +94,7 @@ public class NativeQueryMaker implements QueryMaker
   }
 
   @Override
-  public QueryResponse runQuery(final DruidQuery druidQuery)
+  public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
   {
     final Query<?> query = druidQuery.getQuery();
 
@@ -173,7 +173,8 @@ public class NativeQueryMaker implements QueryMaker
                              .orElseGet(query::getIntervals);
   }
 
-  private <T> QueryResponse execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
+  @SuppressWarnings("unchecked")
+  private <T> QueryResponse<Object[]> execute(Query<?> query, final List<String> newFields, final List<SqlTypeName> newTypes)
   {
     Hook.QUERY_PLAN.run(query);
 
@@ -195,14 +196,19 @@ public class NativeQueryMaker implements QueryMaker
     // otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
     // array-based results before starting the query; but in practice we don't expect this to happen since we keep
     // tight control over which query types we generate in the SQL layer. They all support array-based results.)
-    final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
-
-
-    return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes);
+    final QueryResponse<T> results = queryLifecycle.runSimple((Query<T>) query, authenticationResult, authorizationResult);
+
+    return mapResultSequence(
+        results,
+        (QueryToolChest<T, Query<T>>) queryLifecycle.getToolChest(),
+        (Query<T>) query,
+        newFields,
+        newTypes
+    );
   }
 
-  private <T> QueryResponse mapResultSequence(
-      final QueryResponse results,
+  private <T> QueryResponse<Object[]> mapResultSequence(
+      final QueryResponse<T> results,
       final QueryToolChest<T, Query<T>> toolChest,
       final Query<T> query,
       final List<String> newFields,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
index 8acc02230c..8039d60c3e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
@@ -33,5 +33,5 @@ public interface QueryMaker
    * created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
    * {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
    */
-  QueryResponse runQuery(DruidQuery druidQuery);
+  QueryResponse<Object[]> runQuery(DruidQuery druidQuery);
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index db7f7d4ae7..dc4bfbcfb6 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -128,7 +128,7 @@ public class SqlResource
     try {
       Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
       ResultSet resultSet = stmt.plan();
-      final QueryResponse response = resultSet.run();
+      final QueryResponse<Object[]> response = resultSet.run();
       final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
       final Yielder<Object[]> finalYielder = Yielders.each(response.getResults());
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 92d0a5773f..46d3e7fccd 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -43,7 +43,7 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.planner.DruidPlanner;
+import org.apache.druid.sql.calcite.planner.IngestHandler;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.util.CalciteTests;
@@ -204,7 +204,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "INSERT with target column list is not supported.")
+        .expectValidationError(SqlPlanningException.class, "INSERT with a target column list is not supported.")
         .verify();
   }
 
@@ -226,7 +226,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .sql("INSERT INTO dst SELECT * FROM INFORMATION_SCHEMA.COLUMNS PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot query table [INFORMATION_SCHEMA.COLUMNS] with SQL engine 'ingestion-test'."
+            "Cannot query table INFORMATION_SCHEMA.COLUMNS with SQL engine 'ingestion-test'."
         )
         .verify();
   }
@@ -238,7 +238,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
+            "Cannot INSERT into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
         )
         .verify();
   }
@@ -250,7 +250,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [view.aview] because it is not a Druid datasource (schema = druid)."
+            "Cannot INSERT into view.aview because it is not a Druid datasource."
         )
         .verify();
   }
@@ -280,7 +280,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
+            "Cannot INSERT into nonexistent.dst because it is not a Druid datasource."
         )
         .verify();
   }
@@ -435,7 +435,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         )
         .expectValidationError(
             SqlPlanningException.class,
-            "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"
+            "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"
         )
         .verify();
   }
@@ -517,7 +517,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
     }
     catch (SqlPlanningException e) {
       Assert.assertEquals(
-          "Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
+          "Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
           e.getMessage()
       );
     }
@@ -561,7 +561,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
     }
     catch (SqlPlanningException e) {
       Assert.assertEquals(
-          "Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
+          "Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
           e.getMessage()
       );
     }
@@ -796,7 +796,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED BY ALL")
         .expectValidationError(
             SqlPlanningException.class,
-            DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+            IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
         )
         .verify();
   }
@@ -808,7 +808,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
         .sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED BY ALL")
         .expectValidationError(
             SqlPlanningException.class,
-            DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+            IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
         )
         .verify();
   }
@@ -822,7 +822,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
              + "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+            IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
         )
         .verify();
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
index a244abef6a..eb31a17fc9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -255,7 +255,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.")
+        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE statement, use CLUSTERED BY instead.")
         .verify();
   }
 
@@ -390,7 +390,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "REPLACE with target column list is not supported.")
+        .expectValidationError(SqlPlanningException.class, "REPLACE with a target column list is not supported.")
         .verify();
   }
 
@@ -408,7 +408,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE ALL SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo CLUSTERED BY dim1")
-        .expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause")
+        .expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause")
         .verify();
   }
 
@@ -417,7 +417,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
+        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
         .verify();
   }
 
@@ -426,7 +426,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE SELECT * FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
+        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
         .verify();
   }
 
@@ -437,7 +437,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
         .sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot REPLACE into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
+            "Cannot REPLACE into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
         )
         .verify();
   }
@@ -449,7 +449,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
         .sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot REPLACE into [view.aview] because it is not a Druid datasource (schema = druid)."
+            "Cannot REPLACE into view.aview because it is not a Druid datasource."
         )
         .verify();
   }
@@ -479,7 +479,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
         .sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot REPLACE into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
+            "Cannot REPLACE into nonexistent.dst because it is not a Druid datasource."
         )
         .verify();
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
index 22c004947a..8562ce29be 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
@@ -45,7 +45,7 @@ public class TestInsertQueryMaker implements QueryMaker
   }
 
   @Override
-  public QueryResponse runQuery(final DruidQuery druidQuery)
+  public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
   {
     // Don't actually execute anything, but do record information that tests will check for.
 
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index d1bb098b84..88d754e0fa 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -2077,7 +2077,7 @@ public class SqlResourceTest extends CalciteTestBase
       return new ResultSet(plannerResult)
       {
         @Override
-        public QueryResponse run()
+        public QueryResponse<Object[]> run()
         {
           final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
               Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
@@ -2085,12 +2085,12 @@ public class SqlResourceTest extends CalciteTestBase
           final NonnullPair<CountDownLatch, Boolean> executeLatch = executeLatchSupplier.get();
           if (executeLatch != null) {
             if (executeLatch.rhs) {
-              final QueryResponse resp = super.run();
+              final QueryResponse<Object[]> resp = super.run();
               Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
               executeLatch.lhs.countDown();
               final ResponseContext respContext = resp.getResponseContext();
               respContext.merge(responseContextSupplier.get());
-              return new QueryResponse(sequence, respContext);
+              return new QueryResponse<>(sequence, respContext);
             } else {
               try {
                 if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
@@ -2103,11 +2103,11 @@ public class SqlResourceTest extends CalciteTestBase
             }
           }
 
-          final QueryResponse resp = super.run();
+          final QueryResponse<Object[]> resp = super.run();
           Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
           final ResponseContext respContext = resp.getResponseContext();
           respContext.merge(responseContextSupplier.get());
-          return new QueryResponse(sequence, respContext);
+          return new QueryResponse<>(sequence, respContext);
         }
       };
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org