You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2022/09/12 08:40:15 UTC

[druid] branch master updated: Expose HTTP Response headers from SqlResource (#13052)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ba0075c0c Expose HTTP Response headers from SqlResource (#13052)
5ba0075c0c is described below

commit 5ba0075c0c5054b7f11b02201e80e952dcf0e691
Author: imply-cheddar <86...@users.noreply.github.com>
AuthorDate: Mon Sep 12 17:40:06 2022 +0900

    Expose HTTP Response headers from SqlResource (#13052)
    
    * Expose HTTP Response headers from SqlResource
    
    This change makes the SqlResource expose HTTP response
    headers in the same way that the QueryResource exposes them.
    
    Fundamentally, the change is to pipe the QueryResponse
    object all the way through to the Resource so that it can
    populate response headers.  There is also some code
    cleanup around DI, as there was a superfluous FactoryFactory
    class muddying things up.
---
 .../apache/druid/benchmark/query/SqlBenchmark.java |   2 +-
 .../benchmark/query/SqlExpressionBenchmark.java    |   2 +-
 .../benchmark/query/SqlNestedDataBenchmark.java    |   2 +-
 .../benchmark/query/SqlVsNativeBenchmark.java      |   2 +-
 .../druid/guice/annotations/NativeQuery.java       |  26 +--
 .../org/apache/druid/guice/annotations/MSQ.java    |  26 +--
 .../org/apache/druid/msq/guice/MSQSqlModule.java   |  20 ++
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |   6 +-
 .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java |   2 -
 .../org/apache/druid/msq/sql/SqlTaskResource.java  |  11 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |  17 +-
 .../druid/query/context/ResponseContext.java       |   5 +-
 .../org/apache/druid/query/TestBufferPool.java     |  36 ++--
 .../druid/query/groupby/TestGroupByBuffers.java    |   4 +-
 .../org/apache/druid/server/QueryLifecycle.java    |  55 +++---
 .../org/apache/druid/server/QueryResource.java     |  80 ++++----
 .../org/apache/druid/server/QueryResponse.java     |  39 ++--
 .../initialization/AuthenticatorMapperModule.java  |  15 +-
 .../druid/server/security/AuthenticatorMapper.java |   2 -
 .../druid/server/security/AuthorizerMapper.java    |   3 -
 sql/pom.xml                                        |   5 +
 .../java/org/apache/druid/sql/DirectStatement.java |   6 +-
 .../org/apache/druid/sql/SqlStatementFactory.java  |  38 +++-
 .../druid/sql/SqlStatementFactoryFactory.java      | 105 -----------
 .../druid/sql/avatica/DruidJdbcResultSet.java      |   2 +-
 .../org/apache/druid/sql/avatica/DruidMeta.java    |  13 +-
 .../druid/sql/calcite/planner/DruidPlanner.java    |  55 +++---
 .../druid/sql/calcite/planner/PlannerResult.java   |   8 +-
 .../org/apache/druid/sql/calcite/rel/DruidRel.java |   4 +-
 .../druid/sql/calcite/rel/DruidUnionRel.java       |  55 +++++-
 .../druid/sql/calcite/run/NativeQueryMaker.java    |  45 +++--
 .../apache/druid/sql/calcite/run/QueryMaker.java   |   4 +-
 .../sql/calcite/schema/SegmentMetadataCache.java   |   3 +-
 .../java/org/apache/druid/sql/guice/SqlModule.java |  82 ++++++++-
 .../org/apache/druid/sql/http/SqlHttpModule.java   |   2 +
 .../org/apache/druid/sql/http/SqlResource.java     |  91 +++++----
 .../org/apache/druid/sql/SqlStatementTest.java     |  28 +--
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |   5 +-
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |   2 +-
 .../druid/sql/calcite/CalciteJoinQueryTest.java    |   2 +-
 .../calcite/SqlVectorizedExpressionSanityTest.java |   4 +-
 .../druid/sql/calcite/TestInsertQueryMaker.java    |   8 +-
 .../calcite/schema/SegmentMetadataCacheTest.java   |   4 +-
 .../druid/sql/calcite/util/CalciteTests.java       |  23 ++-
 .../org/apache/druid/sql/guice/SqlModuleTest.java  |  10 +-
 .../apache/druid/sql/http/SqlHttpModuleTest.java   |  27 +--
 .../org/apache/druid/sql/http/SqlResourceTest.java | 203 +++++++++++++++------
 47 files changed, 696 insertions(+), 493 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 8b65ba0ebe..35974e4430 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -518,7 +518,7 @@ public class SqlBenchmark
     final String sql = QUERIES.get(Integer.parseInt(query));
     try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
       final PlannerResult plannerResult = planner.plan();
-      final Sequence<Object[]> resultSequence = plannerResult.run();
+      final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
       final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
       blackhole.consume(lastRow);
     }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index f7e3a3a7a4..e1c27afc98 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -354,7 +354,7 @@ public class SqlExpressionBenchmark
     final String sql = QUERIES.get(Integer.parseInt(query));
     try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
       final PlannerResult plannerResult = planner.plan();
-      final Sequence<Object[]> resultSequence = plannerResult.run();
+      final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
       final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
       blackhole.consume(lastRow);
     }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
index a4876421c6..ab3f5de9ce 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
@@ -320,7 +320,7 @@ public class SqlNestedDataBenchmark
     final String sql = QUERIES.get(Integer.parseInt(query));
     try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
       final PlannerResult plannerResult = planner.plan();
-      final Sequence<Object[]> resultSequence = plannerResult.run();
+      final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
       final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
       blackhole.consume(lastRow);
     }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
index d7d2d7ad2c..b11188eb98 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
@@ -169,7 +169,7 @@ public class SqlVsNativeBenchmark
   {
     try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sqlQuery, new QueryContext())) {
       final PlannerResult plannerResult = planner.plan();
-      final Sequence<Object[]> resultSequence = plannerResult.run();
+      final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
       final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
       blackhole.consume(lastRow);
     }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java b/core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java
similarity index 58%
copy from sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
copy to core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java
index 4d7a5d26bb..60b0b191a3 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
+++ b/core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java
@@ -17,20 +17,24 @@
  * under the License.
  */
 
-package org.apache.druid.sql.http;
+package org.apache.druid.guice.annotations;
 
-import com.google.inject.Binder;
-import com.google.inject.Module;
-import org.apache.druid.guice.Jerseys;
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 /**
- * The Module responsible for providing bindings to the SQL http endpoint
+ * Binding annotation for implements of interfaces that are focused on running native queries.  This is generally
+ * contrasted with the MSQ annotation.
+ *
+ * @see Parent
  */
-public class SqlHttpModule implements Module
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@BindingAnnotation
+public @interface NativeQuery
 {
-  @Override
-  public void configure(Binder binder)
-  {
-    Jerseys.addResource(binder, SqlResource.class);
-  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java
similarity index 59%
copy from sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
copy to extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java
index 4d7a5d26bb..c480168de2 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java
@@ -17,20 +17,24 @@
  * under the License.
  */
 
-package org.apache.druid.sql.http;
+package org.apache.druid.guice.annotations;
 
-import com.google.inject.Binder;
-import com.google.inject.Module;
-import org.apache.druid.guice.Jerseys;
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 /**
- * The Module responsible for providing bindings to the SQL http endpoint
+ * Binding annotation for implements of interfaces that are MSQ (MultiStageQuery) focused.  This is generally
+ * contrasted with the NativeQ annotation.
+ *
+ * @see Parent
  */
-public class SqlHttpModule implements Module
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@BindingAnnotation
+public @interface MSQ
 {
-  @Override
-  public void configure(Binder binder)
-  {
-    Jerseys.addResource(binder, SqlResource.class);
-  }
 }
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java
index 9aea33f407..e2c47c2a2d 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java
@@ -22,10 +22,16 @@ package org.apache.druid.msq.guice;
 import com.fasterxml.jackson.databind.Module;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
+import com.google.inject.Provides;
 import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.guice.annotations.MSQ;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.metadata.input.InputSourceModule;
+import org.apache.druid.msq.sql.MSQTaskSqlEngine;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.SqlToolbox;
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
 import org.apache.druid.sql.guice.SqlBindings;
 
@@ -54,7 +60,21 @@ public class MSQSqlModule implements DruidModule
     // We want this module to bring InputSourceModule along for the ride.
     binder.install(new InputSourceModule());
 
+    binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class);
+
     // Set up the EXTERN macro.
     SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
   }
+
+
+  @Provides
+  @MSQ
+  @LazySingleton
+  public SqlStatementFactory makeMSQSqlStatementFactory(
+      final MSQTaskSqlEngine engine,
+      SqlToolbox toolbox
+  )
+  {
+    return new SqlStatementFactory(toolbox.withEngine(engine));
+  }
 }
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 53bb5bf846..e2754cacbe 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.indexing.ColumnMapping;
@@ -49,6 +48,7 @@ import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
 import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker
   }
 
   @Override
-  public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
+  public QueryResponse runQuery(final DruidQuery druidQuery)
   {
     String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
 
@@ -259,7 +259,7 @@ public class MSQTaskQueryMaker implements QueryMaker
     );
 
     FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true);
-    return Sequences.simple(Collections.singletonList(new Object[]{taskId}));
+    return QueryResponse.withEmptyContext(Sequences.simple(Collections.singletonList(new Object[]{taskId})));
   }
 
   private static Map<String, ColumnType> buildAggregationIntermediateTypeMap(final DruidQuery druidQuery)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 593eccac4f..226c4ffcdb 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -32,7 +32,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Pair;
-import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -55,7 +54,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-@LazySingleton
 public class MSQTaskSqlEngine implements SqlEngine
 {
   public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
index e49fda973a..1725060806 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
@@ -25,6 +25,7 @@ import com.google.common.io.CountingOutputStream;
 import com.google.inject.Inject;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.druid.common.exception.SanitizableException;
+import org.apache.druid.guice.annotations.MSQ;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Yielder;
@@ -37,6 +38,7 @@ import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.QueryUnsupportedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AuthorizationUtils;
@@ -47,7 +49,6 @@ import org.apache.druid.sql.HttpStatement;
 import org.apache.druid.sql.SqlPlanningException;
 import org.apache.druid.sql.SqlRowTransformer;
 import org.apache.druid.sql.SqlStatementFactory;
-import org.apache.druid.sql.SqlStatementFactoryFactory;
 import org.apache.druid.sql.http.ResultFormat;
 import org.apache.druid.sql.http.SqlQuery;
 import org.apache.druid.sql.http.SqlResource;
@@ -91,14 +92,13 @@ public class SqlTaskResource
 
   @Inject
   public SqlTaskResource(
-      final MSQTaskSqlEngine engine,
-      final SqlStatementFactoryFactory sqlStatementFactoryFactory,
+      final @MSQ SqlStatementFactory sqlStatementFactory,
       final ServerConfig serverConfig,
       final AuthorizerMapper authorizerMapper,
       final ObjectMapper jsonMapper
   )
   {
-    this.sqlStatementFactory = sqlStatementFactoryFactory.factorize(engine);
+    this.sqlStatementFactory = sqlStatementFactory;
     this.serverConfig = serverConfig;
     this.authorizerMapper = authorizerMapper;
     this.jsonMapper = jsonMapper;
@@ -147,7 +147,8 @@ public class SqlTaskResource
     final String sqlQueryId = stmt.sqlQueryId();
     try {
       final DirectStatement.ResultSet plan = stmt.plan();
-      final Sequence<Object[]> sequence = plan.run();
+      final QueryResponse response = plan.run();
+      final Sequence sequence = response.getResults();
       final SqlRowTransformer rowTransformer = plan.createRowTransformer();
       final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());
 
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index ae641fc7c7..880c6f4ade 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -35,6 +35,7 @@ import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
+import com.google.inject.util.Modules;
 import com.google.inject.util.Providers;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -47,6 +48,7 @@ import org.apache.druid.guice.IndexingServiceTuningConfigModule;
 import org.apache.druid.guice.JoinableFactoryModule;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.MSQ;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.hll.HyperLogLogCollector;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -125,6 +127,7 @@ import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.DirectStatement;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.SqlToolbox;
 import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
 import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
@@ -341,7 +344,17 @@ public class MSQTestBase extends BaseCalciteQueryTest
         new JoinableFactoryModule(),
         new IndexingServiceTuningConfigModule(),
         new MSQIndexingModule(),
-        new MSQSqlModule(),
+        Modules.override(new MSQSqlModule()).with(
+            binder -> {
+              // Our Guice configuration currently requires bindings to exist even if they aren't ever used, the
+              // following bindings are overriding other bindings that end up needing a lot more dependencies.
+              // We replace the bindings with something that returns null to make things more brittle in case they
+              // actually are used somewhere in the test.
+              binder.bind(SqlStatementFactory.class).annotatedWith(MSQ.class).toProvider(Providers.of(null));
+              binder.bind(SqlToolbox.class).toProvider(Providers.of(null));
+              binder.bind(MSQTaskSqlEngine.class).toProvider(Providers.of(null));
+            }
+        ),
         new MSQExternalDataSourceModule()
     ));
 
@@ -580,7 +593,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
         )
     );
 
-    final List<Object[]> sequence = stmt.execute().toList();
+    final List<Object[]> sequence = stmt.execute().getResults().toList();
     return (String) Iterables.getOnlyElement(sequence)[0];
   }
 
diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
index 163b70383e..a943297d17 100644
--- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
+++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
@@ -39,7 +39,6 @@ import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -733,6 +732,10 @@ public abstract class ResponseContext
    */
   public void merge(ResponseContext responseContext)
   {
+    if (responseContext == null) {
+      return;
+    }
+
     responseContext.getDelegate().forEach((key, newValue) -> {
       if (newValue != null) {
         add(key, newValue);
diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java
index e841397c0e..10690d31be 100644
--- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java
+++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java
@@ -20,7 +20,6 @@
 package org.apache.druid.query;
 
 import com.google.common.collect.Iterables;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
@@ -31,26 +30,29 @@ import org.apache.druid.utils.CloseableUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 /**
  * A buffer pool that throws away buffers when they are "returned" to the pool. Useful for tests that need to make
  * many pools and use them one at a time.
- *
+ * <p>
  * This pool implements {@link BlockingPool}, but never blocks. It returns immediately if resources are available;
  * otherwise it returns an empty list immediately. This is also useful for tests, because it allows "timeouts" to
  * happen immediately and therefore speeds up tests.
  */
 public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool<ByteBuffer>
 {
+  private final AtomicLong takeCount = new AtomicLong(0);
+  private final ConcurrentHashMap<Long, RuntimeException> takenFromMap = new ConcurrentHashMap<>();
+
   private final Supplier<ResourceHolder<ByteBuffer>> generator;
   private final int maxCount;
 
-  @GuardedBy("this")
-  private long numOutstanding;
-
   private TestBufferPool(final Supplier<ResourceHolder<ByteBuffer>> generator, final int maxCount)
   {
     this.generator = generator;
@@ -60,7 +62,8 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
   public static TestBufferPool onHeap(final int bufferSize, final int maxCount)
   {
     return new TestBufferPool(
-        () -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {}),
+        () -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {
+        }),
         maxCount
     );
   }
@@ -102,20 +105,20 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
   public List<ReferenceCountingResourceHolder<ByteBuffer>> takeBatch(int elementNum)
   {
     synchronized (this) {
-      if (numOutstanding + elementNum <= maxCount) {
+      if (takenFromMap.size() + elementNum <= maxCount) {
         final List<ReferenceCountingResourceHolder<ByteBuffer>> retVal = new ArrayList<>();
 
         try {
           for (int i = 0; i < elementNum; i++) {
             final ResourceHolder<ByteBuffer> holder = generator.get();
             final ByteBuffer o = holder.get();
+            final long ticker = takeCount.getAndIncrement();
+            takenFromMap.put(ticker, new RuntimeException());
+
             retVal.add(new ReferenceCountingResourceHolder<>(o, () -> {
-              synchronized (this) {
-                numOutstanding--;
-                holder.close();
-              }
+              takenFromMap.remove(ticker);
+              holder.close();
             }));
-            numOutstanding++;
           }
         }
         catch (Throwable e) {
@@ -131,8 +134,11 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
 
   public long getOutstandingObjectCount()
   {
-    synchronized (this) {
-      return numOutstanding;
-    }
+    return takenFromMap.size();
+  }
+
+  public Collection<RuntimeException> getOutstandingExceptionsCreated()
+  {
+    return takenFromMap.values();
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java b/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java
index f83160b195..c25d66b668 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java
@@ -85,7 +85,9 @@ public class TestGroupByBuffers implements Closeable
     }
 
     if (mergePool != null) {
-      Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
+      if (mergePool.getOutstandingObjectCount() != 0) {
+        throw mergePool.getOutstandingExceptionsCreated().iterator().next();
+      }
       mergePool = null;
     }
   }
diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
index 1bdba45151..b4d80d01d9 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
@@ -141,7 +141,7 @@ public class QueryLifecycle
    * @return results
    */
   @SuppressWarnings("unchecked")
-  public <T> Sequence<T> runSimple(
+  public <T> QueryResponse runSimple(
       final Query<T> query,
       final AuthenticationResult authenticationResult,
       final Access authorizationResult
@@ -151,13 +151,14 @@ public class QueryLifecycle
 
     final Sequence<T> results;
 
+    final QueryResponse queryResponse;
     try {
       preAuthorized(authenticationResult, authorizationResult);
       if (!authorizationResult.isAllowed()) {
         throw new ISE("Unauthorized");
       }
 
-      final QueryResponse queryResponse = execute();
+      queryResponse = execute();
       results = queryResponse.getResults();
     }
     catch (Throwable e) {
@@ -165,16 +166,25 @@ public class QueryLifecycle
       throw e;
     }
 
-    return Sequences.wrap(
-        results,
-        new SequenceWrapper()
-        {
-          @Override
-          public void after(final boolean isDone, final Throwable thrown)
-          {
-            emitLogsAndMetrics(thrown, null, -1);
-          }
-        }
+    /*
+     * It seems extremely weird that the below code is wrapping the Sequence in order to emitLogsAndMetrics.
+     * The Sequence was returned by the call to execute, it would be worthwile to figure out why this wrapping
+     * cannot be moved into execute().  We leave this as an exercise for the future, however as this oddity
+     * was discovered while just trying to expose HTTP response headers
+     */
+    return new QueryResponse(
+        Sequences.wrap(
+            results,
+            new SequenceWrapper()
+            {
+              @Override
+              public void after(final boolean isDone, final Throwable thrown)
+              {
+                emitLogsAndMetrics(thrown, null, -1);
+              }
+            }
+        ),
+        queryResponse.getResponseContext()
     );
   }
 
@@ -439,25 +449,4 @@ public class QueryLifecycle
     DONE
   }
 
-  public static class QueryResponse
-  {
-    private final Sequence results;
-    private final ResponseContext responseContext;
-
-    private QueryResponse(final Sequence results, final ResponseContext responseContext)
-    {
-      this.results = results;
-      this.responseContext = responseContext;
-    }
-
-    public Sequence getResults()
-    {
-      return results;
-    }
-
-    public ResponseContext getResponseContext()
-    {
-      return responseContext;
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 590347735b..ea225efbae 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -200,7 +200,7 @@ public class QueryResource implements QueryCountStatsProvider
         throw new ForbiddenException(authResult.toString());
       }
 
-      final QueryLifecycle.QueryResponse queryResponse = queryLifecycle.execute();
+      final QueryResponse queryResponse = queryLifecycle.execute();
       final Sequence<?> results = queryResponse.getResults();
       final ResponseContext responseContext = queryResponse.getResponseContext();
       final String prevEtag = getPreviousEtag(req);
@@ -255,41 +255,11 @@ public class QueryResource implements QueryCountStatsProvider
             )
             .header(QUERY_ID_RESPONSE_HEADER, queryId);
 
-        transferEntityTag(responseContext, responseBuilder);
-
-        DirectDruidClient.removeMagicResponseContextFields(responseContext);
-
-        // Limit the response-context header, see https://github.com/apache/druid/issues/2331
-        // Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
-        // and encodes the string using ASCII, so 1 char is = 1 byte
-        final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith(
-            jsonMapper,
-            responseContextConfig.getMaxResponseContextHeaderSize()
+        attachResponseContextToHttpResponse(queryId, responseContext, responseBuilder, jsonMapper,
+                                            responseContextConfig, selfNode
         );
 
-        if (serializationResult.isTruncated()) {
-          final String logToPrint = StringUtils.format(
-              "Response Context truncated for id [%s]. Full context is [%s].",
-              queryId,
-              serializationResult.getFullResult()
-          );
-          if (responseContextConfig.shouldFailOnTruncatedResponseContext()) {
-            log.error(logToPrint);
-            throw new QueryInterruptedException(
-                new TruncatedResponseContextException(
-                    "Serialized response context exceeds the max size[%s]",
-                    responseContextConfig.getMaxResponseContextHeaderSize()
-                ),
-                selfNode.getHostAndPortToUse()
-            );
-          } else {
-            log.warn(logToPrint);
-          }
-        }
-
-        return responseBuilder
-            .header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult())
-            .build();
+        return responseBuilder.build();
       }
       catch (QueryException e) {
         // make sure to close yielder if anything happened before starting to serialize the response.
@@ -358,6 +328,48 @@ public class QueryResource implements QueryCountStatsProvider
     }
   }
 
+  public static void attachResponseContextToHttpResponse(
+      String queryId,
+      ResponseContext responseContext,
+      Response.ResponseBuilder responseBuilder,
+      ObjectMapper jsonMapper, ResponseContextConfig responseContextConfig, DruidNode selfNode
+  ) throws JsonProcessingException
+  {
+    transferEntityTag(responseContext, responseBuilder);
+
+    DirectDruidClient.removeMagicResponseContextFields(responseContext);
+
+    // Limit the response-context header, see https://github.com/apache/druid/issues/2331
+    // Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
+    // and encodes the string using ASCII, so 1 char is = 1 byte
+    final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith(
+        jsonMapper,
+        responseContextConfig.getMaxResponseContextHeaderSize()
+    );
+
+    if (serializationResult.isTruncated()) {
+      final String logToPrint = StringUtils.format(
+          "Response Context truncated for id [%s]. Full context is [%s].",
+          queryId,
+          serializationResult.getFullResult()
+      );
+      if (responseContextConfig.shouldFailOnTruncatedResponseContext()) {
+        log.error(logToPrint);
+        throw new QueryInterruptedException(
+            new TruncatedResponseContextException(
+                "Serialized response context exceeds the max size[%s]",
+                responseContextConfig.getMaxResponseContextHeaderSize()
+            ),
+            selfNode.getHostAndPortToUse()
+        );
+      } else {
+        log.warn(logToPrint);
+      }
+    }
+
+    responseBuilder.header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult());
+  }
+
   private Query<?> readQuery(
       final HttpServletRequest req,
       final InputStream in,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java b/server/src/main/java/org/apache/druid/server/QueryResponse.java
similarity index 57%
copy from sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
copy to server/src/main/java/org/apache/druid/server/QueryResponse.java
index 72dae85d0e..69908aee94 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResponse.java
@@ -17,21 +17,34 @@
  * under the License.
  */
 
-package org.apache.druid.sql.calcite.run;
+package org.apache.druid.server;
 
 import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.query.context.ResponseContext;
 
-/**
- * Interface for executing Druid queries. Each one is created by a {@link SqlEngine} and is tied to a
- * specific SQL query.
- */
-public interface QueryMaker
+public class QueryResponse
 {
-  /**
-   * Executes a given Druid query, which is expected to correspond to the SQL query that this QueryMaker was originally
-   * created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
-   * {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
-   */
-  Sequence<Object[]> runQuery(DruidQuery druidQuery);
+  public static QueryResponse withEmptyContext(Sequence results)
+  {
+    return new QueryResponse(results, ResponseContext.createEmpty());
+  }
+
+  private final Sequence results;
+  private final ResponseContext responseContext;
+
+  public QueryResponse(final Sequence results, final ResponseContext responseContext)
+  {
+    this.results = results;
+    this.responseContext = responseContext;
+  }
+
+  public Sequence getResults()
+  {
+    return results;
+  }
+
+  public ResponseContext getResponseContext()
+  {
+    return responseContext;
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java b/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java
index 56c17218e9..c2e240d8e9 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java
@@ -23,12 +23,10 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.Provider;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.JsonConfigurator;
 import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -54,22 +52,18 @@ public class AuthenticatorMapperModule implements DruidModule
     binder.bind(AuthenticatorMapper.class)
           .toProvider(new AuthenticatorMapperProvider())
           .in(LazySingleton.class);
-
-    LifecycleModule.register(binder, AuthenticatorMapper.class);
   }
 
   private static class AuthenticatorMapperProvider implements Provider<AuthenticatorMapper>
   {
     private AuthConfig authConfig;
-    private Injector injector;
     private Properties props;
     private JsonConfigurator configurator;
 
     @Inject
-    public void inject(Injector injector, Properties props, JsonConfigurator configurator)
+    public void inject(AuthConfig authConfig, Properties props, JsonConfigurator configurator)
     {
-      this.authConfig = injector.getInstance(AuthConfig.class);
-      this.injector = injector;
+      this.authConfig = authConfig;
       this.props = props;
       this.configurator = configurator;
     }
@@ -91,7 +85,10 @@ public class AuthenticatorMapperModule implements DruidModule
       }
 
       for (String authenticatorName : authenticators) {
-        final String authenticatorPropertyBase = StringUtils.format(AUTHENTICATOR_PROPERTIES_FORMAT_STRING, authenticatorName);
+        final String authenticatorPropertyBase = StringUtils.format(
+            AUTHENTICATOR_PROPERTIES_FORMAT_STRING,
+            authenticatorName
+        );
         final JsonConfigProvider<Authenticator> authenticatorProvider = JsonConfigProvider.of(
             authenticatorPropertyBase,
             Authenticator.class
diff --git a/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java b/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java
index 5b59fe65e4..4e88869135 100644
--- a/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java
+++ b/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java
@@ -20,12 +20,10 @@
 package org.apache.druid.server.security;
 
 import com.google.common.collect.Lists;
-import org.apache.druid.guice.ManageLifecycle;
 
 import java.util.List;
 import java.util.Map;
 
-@ManageLifecycle
 public class AuthenticatorMapper
 {
   private Map<String, Authenticator> authenticatorMap;
diff --git a/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java b/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java
index e888cc14f9..5ab2d1d2e1 100644
--- a/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java
+++ b/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java
@@ -19,11 +19,8 @@
 
 package org.apache.druid.server.security;
 
-import org.apache.druid.guice.ManageLifecycle;
-
 import java.util.Map;
 
-@ManageLifecycle
 public class AuthorizerMapper
 {
   private Map<String, Authorizer> authorizerMap;
diff --git a/sql/pom.xml b/sql/pom.xml
index 846b44ef7f..bbf2eb049e 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -208,6 +208,11 @@
       <artifactId>easymock</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-core</artifactId>
diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
index d73761c825..e173213a0c 100644
--- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
@@ -23,9 +23,9 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
 import org.apache.druid.sql.calcite.planner.DruidPlanner;
@@ -98,7 +98,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
      * Do the actual execute step which allows subclasses to wrap the sequence,
      * as is sometimes needed for testing.
      */
-    public Sequence<Object[]> run()
+    public QueryResponse run()
     {
       try {
         // Check cancellation. Required for SqlResourceTest to work.
@@ -176,7 +176,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
    *
    * @return sequence which delivers query results
    */
-  public Sequence<Object[]> execute()
+  public QueryResponse execute()
   {
     return plan().run();
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java b/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java
index b1ecde6f4c..c8450d6216 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java
@@ -23,11 +23,41 @@ import org.apache.druid.sql.http.SqlQuery;
 
 import javax.servlet.http.HttpServletRequest;
 
-public interface SqlStatementFactory
+/**
+ * A class for the creation of Statements, which happen to be used for Sql.
+ */
+public class SqlStatementFactory
 {
-  HttpStatement httpStatement(SqlQuery sqlQuery, HttpServletRequest req);
+  private final SqlToolbox lifecycleToolbox;
+
+  /**
+   * The construction of these objects in the production code is a bit circuitous.  Specifically, the SqlToolbox
+   * looks like it can be normally injected, except it actually expects to be mutated with a SqlEngine before being
+   * injected.  This is generally accomplished with Guice, examples of which can be seen in the
+   * SqlStatementFactoryModule.
+   *
+   * @param lifecycleToolbox
+   */
+  public SqlStatementFactory(SqlToolbox lifecycleToolbox)
+  {
+    this.lifecycleToolbox = lifecycleToolbox;
+  }
+
+  public HttpStatement httpStatement(
+      final SqlQuery sqlQuery,
+      final HttpServletRequest req
+  )
+  {
+    return new HttpStatement(lifecycleToolbox, sqlQuery, req);
+  }
 
-  DirectStatement directStatement(SqlQueryPlus sqlRequest);
+  public DirectStatement directStatement(final SqlQueryPlus sqlRequest)
+  {
+    return new DirectStatement(lifecycleToolbox, sqlRequest);
+  }
 
-  PreparedStatement preparedStatement(SqlQueryPlus sqlRequest);
+  public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest)
+  {
+    return new PreparedStatement(lifecycleToolbox, sqlRequest);
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java b/sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java
deleted file mode 100644
index 5d32735cfe..0000000000
--- a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql;
-
-import com.google.common.base.Supplier;
-import com.google.inject.Inject;
-import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.DefaultQueryConfig;
-import org.apache.druid.server.QueryScheduler;
-import org.apache.druid.server.log.RequestLogger;
-import org.apache.druid.server.security.AuthConfig;
-import org.apache.druid.sql.calcite.planner.PlannerFactory;
-import org.apache.druid.sql.calcite.run.SqlEngine;
-import org.apache.druid.sql.http.SqlQuery;
-
-import javax.servlet.http.HttpServletRequest;
-
-/**
- * Factory factories: when design patterns go too far.
- *
- * Almost everything we need to create a {@link SqlStatementFactory} is injectable, except for the {@link SqlEngine}.
- * So this class exists to produce {@link SqlStatementFactory} once the engine for a query is known.
- */
-@LazySingleton
-public class SqlStatementFactoryFactory
-{
-  protected final SqlToolbox lifecycleToolbox;
-
-  @Inject
-  public SqlStatementFactoryFactory(
-      final PlannerFactory plannerFactory,
-      final ServiceEmitter emitter,
-      final RequestLogger requestLogger,
-      final QueryScheduler queryScheduler,
-      final AuthConfig authConfig,
-      final Supplier<DefaultQueryConfig> defaultQueryConfig,
-      final SqlLifecycleManager sqlLifecycleManager
-  )
-  {
-    this.lifecycleToolbox = new SqlToolbox(
-        null,
-        plannerFactory,
-        emitter,
-        requestLogger,
-        queryScheduler,
-        authConfig,
-        defaultQueryConfig.get(),
-        sqlLifecycleManager
-    );
-  }
-
-  public SqlStatementFactory factorize(final SqlEngine engine)
-  {
-    return new FactoryImpl(lifecycleToolbox.withEngine(engine));
-  }
-
-  private static class FactoryImpl implements SqlStatementFactory
-  {
-    private final SqlToolbox lifecycleToolbox;
-
-    public FactoryImpl(SqlToolbox lifecycleToolbox)
-    {
-      this.lifecycleToolbox = lifecycleToolbox;
-    }
-
-    @Override
-    public HttpStatement httpStatement(
-        final SqlQuery sqlQuery,
-        final HttpServletRequest req
-    )
-    {
-      return new HttpStatement(lifecycleToolbox, sqlQuery, req);
-    }
-
-    @Override
-    public DirectStatement directStatement(final SqlQueryPlus sqlRequest)
-    {
-      return new DirectStatement(lifecycleToolbox, sqlRequest);
-    }
-
-    @Override
-    public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest)
-    {
-      return new PreparedStatement(lifecycleToolbox, sqlRequest);
-    }
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
index 36a69dd815..95005b7bf5 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -103,7 +103,7 @@ public class DruidJdbcResultSet implements Closeable
     ensure(State.NEW);
     try {
       state = State.RUNNING;
-      final Sequence<Object[]> baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get();
+      final Sequence<Object[]> baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get().getResults();
 
       // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
       final Sequence<Object[]> retSequence =
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
index 89239a0bee..75e4d70c5b 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.calcite.avatica.AvaticaSeverity;
 import org.apache.calcite.avatica.MetaImpl;
 import org.apache.calcite.avatica.MissingResultsException;
@@ -38,6 +37,7 @@ import org.apache.calcite.avatica.remote.AvaticaRuntimeException;
 import org.apache.calcite.avatica.remote.Service.ErrorResponse;
 import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.NativeQuery;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.UOE;
@@ -49,10 +49,8 @@ import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
-import org.apache.druid.sql.SqlStatementFactoryFactory;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.run.NativeSqlEngine;
 import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
@@ -125,15 +123,14 @@ public class DruidMeta extends MetaImpl
 
   @Inject
   public DruidMeta(
-      final NativeSqlEngine engine,
-      final SqlStatementFactoryFactory sqlStatementFactoryFactory,
+      final @NativeQuery SqlStatementFactory sqlStatementFactory,
       final AvaticaServerConfig config,
       final ErrorHandler errorHandler,
-      final Injector injector
+      final AuthenticatorMapper authMapper
   )
   {
     this(
-        sqlStatementFactoryFactory.factorize(engine),
+        sqlStatementFactory,
         config,
         errorHandler,
         Executors.newSingleThreadScheduledExecutor(
@@ -142,7 +139,7 @@ public class DruidMeta extends MetaImpl
                 .setDaemon(true)
                 .build()
         ),
-        injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
+        authMapper.getAuthenticatorChain()
     );
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index 07f2a9dcb0..75be75855c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -68,12 +68,12 @@ import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.Query;
 import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.Resource;
@@ -462,7 +462,7 @@ public class DruidPlanner implements Closeable
       }
 
       // Start the query.
-      final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
+      final Supplier<QueryResponse> resultsSupplier = () -> {
         // sanity check
         final Set<ResourceAction> readResourceActions =
             plannerContext.getResourceActions()
@@ -536,38 +536,40 @@ public class DruidPlanner implements Closeable
               planner.getTypeFactory(),
               plannerContext.getParameters()
       );
-      final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
+      final Supplier<QueryResponse> resultsSupplier = () -> {
         final Enumerable<?> enumerable = theRel.bind(dataContext);
         final Enumerator<?> enumerator = enumerable.enumerator();
-        return Sequences.withBaggage(new BaseSequence<>(
-            new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
-            {
-              @Override
-              public EnumeratorIterator<Object[]> make()
-              {
-                return new EnumeratorIterator<>(new Iterator<Object[]>()
+        return QueryResponse.withEmptyContext(Sequences.withBaggage(
+            new BaseSequence<>(
+                new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
                 {
                   @Override
-                  public boolean hasNext()
+                  public EnumeratorIterator<Object[]> make()
                   {
-                    return enumerator.moveNext();
+                    return new EnumeratorIterator<>(new Iterator<Object[]>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return enumerator.moveNext();
+                      }
+
+                      @Override
+                      public Object[] next()
+                      {
+                        return (Object[]) enumerator.current();
+                      }
+                    });
                   }
 
                   @Override
-                  public Object[] next()
+                  public void cleanup(EnumeratorIterator<Object[]> iterFromMake)
                   {
-                    return (Object[]) enumerator.current();
-                  }
-                });
-              }
-
-              @Override
-              public void cleanup(EnumeratorIterator<Object[]> iterFromMake)
-              {
 
-              }
-            }
-        ), enumerator::close);
+                  }
+                }
+            ), enumerator::close)
+        );
       };
       return new PlannerResult(resultsSupplier, root.validatedRowType);
     }
@@ -606,8 +608,9 @@ public class DruidPlanner implements Closeable
       log.error(jpe, "Encountered exception while serializing Resources for explain output");
       resourcesString = null;
     }
-    final Supplier<Sequence<Object[]>> resultsSupplier = Suppliers.ofInstance(
-        Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})));
+    final Supplier<QueryResponse> resultsSupplier = Suppliers.ofInstance(
+        QueryResponse.withEmptyContext(Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})))
+    );
     return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
index 619f6c5509..57e3ba2cbc 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
@@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.planner;
 import com.google.common.base.Supplier;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.server.QueryResponse;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -32,12 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class PlannerResult
 {
-  private final Supplier<Sequence<Object[]>> resultsSupplier;
+  private final Supplier<QueryResponse> resultsSupplier;
   private final RelDataType rowType;
   private final AtomicBoolean didRun = new AtomicBoolean();
 
   public PlannerResult(
-      final Supplier<Sequence<Object[]>> resultsSupplier,
+      final Supplier<QueryResponse> resultsSupplier,
       final RelDataType rowType
   )
   {
@@ -53,7 +53,7 @@ public class PlannerResult
   /**
    * Run the query
    */
-  public Sequence<Object[]> run()
+  public QueryResponse run()
   {
     if (!didRun.compareAndSet(false, true)) {
       // Safety check.
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
index 6f601ec5aa..9043577a7d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
@@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rel;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
-import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 
 import javax.annotation.Nullable;
@@ -45,7 +45,7 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
   @Nullable
   public abstract PartialDruidQuery getPartialDruidQuery();
 
-  public Sequence<Object[]> runQuery()
+  public QueryResponse runQuery()
   {
     // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
     // is the outermost query, and it will actually get run as a native query. Druid's native query layer will
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
index 25e6e9f523..de1bc8b758 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.rel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -30,14 +31,19 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -47,7 +53,7 @@ import java.util.stream.Collectors;
  * but rather, it represents the concatenation of a series of native queries in the SQL layer. Therefore,
  * {@link #getPartialDruidQuery()} returns null, and this rel cannot be built on top of. It must be the outer rel in a
  * query plan.
- *
+ * <p>
  * See {@link DruidUnionDataSourceRel} for a version that does a regular Druid query using a {@link UnionDataSource}.
  * In the future we expect that {@link UnionDataSource} will gain the ability to union query datasources together, and
  * then this rel could be replaced by {@link DruidUnionDataSourceRel}.
@@ -100,18 +106,51 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public Sequence<Object[]> runQuery()
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public QueryResponse runQuery()
   {
     // Lazy: run each query in sequence, not all at once.
     if (limit == 0) {
-      return Sequences.empty();
+      return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
     } else {
-      final Sequence baseSequence = Sequences.concat(
-          FluentIterable.from(rels).transform(rel -> ((DruidRel) rel).runQuery())
-      );
 
-      return limit > 0 ? baseSequence.limit(limit) : baseSequence;
+      // We run the first rel here for two reasons:
+      // 1) So that we get things running as normally expected when runQuery() is called
+      // 2) So that we have a QueryResponse to return, note that the response headers from the query will only
+      //    have values from this first query and will not contain values from subsequent queries.  This is definitely
+      //    sub-optimal, the other option would be to fire off all queries and combine their QueryResponses, but that
+      //    is also sub-optimal as it would consume parallel query resources and potentially starve the system.
+      //    Instead, we only return the headers from the first query and potentially exception out and fail the query
+      //    if there are any response headers that come from subsequent queries that are correctness concerns
+      final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
+
+      final List<Sequence<Object>> firstAsList = Collections.singletonList(queryResponse.getResults());
+      final Iterable<Sequence<Object>> theRestTransformed = FluentIterable
+          .from(rels.subList(1, rels.size()))
+          .transform(
+              rel -> {
+                final QueryResponse response = ((DruidRel) rel).runQuery();
+
+                final ResponseContext nextContext = response.getResponseContext();
+                final List<Interval> uncoveredIntervals = nextContext.getUncoveredIntervals();
+                if (uncoveredIntervals == null || uncoveredIntervals.isEmpty()) {
+                  return response.getResults();
+                } else {
+                  throw new ISE(
+                      "uncoveredIntervals[%s] existed on a sub-query of a union, incomplete data, failing",
+                      uncoveredIntervals
+                  );
+                }
+              }
+          );
+
+      final Iterable<Sequence<Object>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
+
+      final Sequence returnSequence = Sequences.concat(recombinedSequences);
+      return new QueryResponse(
+          limit > 0 ? returnSequence.limit(limit) : returnSequence,
+          queryResponse.getResponseContext()
+      );
     }
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index 1a78705ab7..f045769ec9 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -54,6 +54,7 @@ import org.apache.druid.segment.data.ComparableList;
 import org.apache.druid.segment.data.ComparableStringArray;
 import org.apache.druid.server.QueryLifecycle;
 import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.sql.calcite.planner.Calcites;
@@ -93,7 +94,7 @@ public class NativeQueryMaker implements QueryMaker
   }
 
   @Override
-  public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
+  public QueryResponse runQuery(final DruidQuery druidQuery)
   {
     final Query<?> query = druidQuery.getQuery();
 
@@ -172,7 +173,7 @@ public class NativeQueryMaker implements QueryMaker
                              .orElseGet(query::getIntervals);
   }
 
-  private <T> Sequence<Object[]> execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
+  private <T> QueryResponse execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
   {
     Hook.QUERY_PLAN.run(query);
 
@@ -194,23 +195,22 @@ public class NativeQueryMaker implements QueryMaker
     // otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
     // array-based results before starting the query; but in practice we don't expect this to happen since we keep
     // tight control over which query types we generate in the SQL layer. They all support array-based results.)
-    final Sequence<T> results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
+    final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
 
-    //noinspection unchecked
-    final QueryToolChest<T, Query<T>> toolChest = queryLifecycle.getToolChest();
-    final List<String> resultArrayFields = toolChest.resultArraySignature(query).getColumnNames();
-    final Sequence<Object[]> resultArrays = toolChest.resultsAsArrays(query, results);
 
-    return mapResultSequence(resultArrays, resultArrayFields, newFields, newTypes);
+    return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes);
   }
 
-  private Sequence<Object[]> mapResultSequence(
-      final Sequence<Object[]> sequence,
-      final List<String> originalFields,
+  private <T> QueryResponse mapResultSequence(
+      final QueryResponse results,
+      final QueryToolChest<T, Query<T>> toolChest,
+      final Query<T> query,
       final List<String> newFields,
       final List<SqlTypeName> newTypes
   )
   {
+    final List<String> originalFields = toolChest.resultArraySignature(query).getColumnNames();
+
     // Build hash map for looking up original field positions, in case the number of fields is super high.
     final Object2IntMap<String> originalFieldsLookup = new Object2IntOpenHashMap<>();
     originalFieldsLookup.defaultReturnValue(-1);
@@ -234,15 +234,20 @@ public class NativeQueryMaker implements QueryMaker
       mapping[i] = idx;
     }
 
-    return Sequences.map(
-        sequence,
-        array -> {
-          final Object[] newArray = new Object[mapping.length];
-          for (int i = 0; i < mapping.length; i++) {
-            newArray[i] = coerce(array[mapping[i]], newTypes.get(i));
-          }
-          return newArray;
-        }
+    //noinspection unchecked
+    final Sequence<Object[]> sequence = toolChest.resultsAsArrays(query, results.getResults());
+    return new QueryResponse(
+        Sequences.map(
+            sequence,
+            array -> {
+              final Object[] newArray = new Object[mapping.length];
+              for (int i = 0; i < mapping.length; i++) {
+                newArray[i] = coerce(array[mapping[i]], newTypes.get(i));
+              }
+              return newArray;
+            }
+        ),
+        results.getResponseContext()
     );
   }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
index 72dae85d0e..8acc02230c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.sql.calcite.run;
 
-import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.sql.calcite.rel.DruidQuery;
 
 /**
@@ -33,5 +33,5 @@ public interface QueryMaker
    * created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
    * {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
    */
-  Sequence<Object[]> runQuery(DruidQuery druidQuery);
+  QueryResponse runQuery(DruidQuery druidQuery);
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
index 427108d9d8..5d6d386a61 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java
@@ -66,7 +66,6 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -907,7 +906,7 @@ public class SegmentMetadataCache
 
     return queryLifecycleFactory
         .factorize()
-        .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK);
+        .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK).getResults();
   }
 
   @VisibleForTesting
diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
index 589ed2575d..614ed62339 100644
--- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
@@ -19,18 +19,30 @@
 
 package org.apache.druid.sql.guice;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.google.inject.Module;
+import com.google.inject.Provides;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.PolyBind;
+import org.apache.druid.guice.annotations.NativeQuery;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultQueryConfig;
+import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.sql.SqlLifecycleManager;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.SqlToolbox;
 import org.apache.druid.sql.avatica.AvaticaModule;
 import org.apache.druid.sql.calcite.aggregation.SqlAggregationModule;
 import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
 import org.apache.druid.sql.calcite.planner.CalcitePlannerModule;
+import org.apache.druid.sql.calcite.planner.PlannerFactory;
+import org.apache.druid.sql.calcite.run.NativeSqlEngine;
 import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule;
 import org.apache.druid.sql.calcite.schema.DruidSchemaManager;
 import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
@@ -50,17 +62,10 @@ public class SqlModule implements Module
   public static final String PROPERTY_SQL_SCHEMA_MANAGER_TYPE = "druid.sql.schemamanager.type";
   public static final String PROPERTY_SQL_APPROX_COUNT_DISTINCT_CHOICE = "druid.sql.approxCountDistinct.function";
 
-  @Inject
   private Properties props;
 
-  public SqlModule()
-  {
-  }
-
-  @VisibleForTesting
-  public SqlModule(
-      Properties props
-  )
+  @Inject
+  public void setProps(Properties props)
   {
     this.props = props;
   }
@@ -101,6 +106,8 @@ public class SqlModule implements Module
     binder.install(new SqlAggregationModule());
     binder.install(new DruidViewModule());
 
+    binder.install(new SqlStatementFactoryModule());
+
     // QueryLookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupExtractorFactoryContainerProvider injected.
     SqlBindings.addOperatorConversion(binder, QueryLookupOperatorConversion.class);
 
@@ -130,4 +137,59 @@ public class SqlModule implements Module
     Preconditions.checkNotNull(props, "props");
     return Boolean.valueOf(props.getProperty(PROPERTY_SQL_ENABLE_AVATICA, "true"));
   }
+
+  /**
+   * We create a new class for this module so that it can be shared by tests.  The structuring of the SqlModule
+   * at time of writing was not conducive to reuse in test code, so, instead of fixing that we just take the easy
+   * way out of adding the test-reusable code to this module and reuse that.
+   *
+   * Generally speaking, the injection pattern done by this module is a bit circuitous.  The `SqlToolbox` acts as
+   * if it can be injected with all of its dependencies, but also expects to be mutated with a new SqlEngine.  We
+   * should likely look at adjusting the object dependencies to actually depend on the SqlToolbox and create
+   * different Toolboxes for the different way that queries are done.  But, for now, I'm not changing the interfaces.
+   */
+  public static class SqlStatementFactoryModule implements Module
+  {
+
+    @Provides
+    @LazySingleton
+    public SqlToolbox makeSqlToolbox(
+        final PlannerFactory plannerFactory,
+        final ServiceEmitter emitter,
+        final RequestLogger requestLogger,
+        final QueryScheduler queryScheduler,
+        final AuthConfig authConfig,
+        final Supplier<DefaultQueryConfig> defaultQueryConfig,
+        final SqlLifecycleManager sqlLifecycleManager
+    )
+    {
+      return new SqlToolbox(
+          null,
+          plannerFactory,
+          emitter,
+          requestLogger,
+          queryScheduler,
+          authConfig,
+          defaultQueryConfig.get(),
+          sqlLifecycleManager
+      );
+    }
+
+    @Provides
+    @NativeQuery
+    @LazySingleton
+    public SqlStatementFactory makeNativeSqlStatementFactory(
+        final NativeSqlEngine sqlEngine,
+        SqlToolbox toolbox
+    )
+    {
+      return new SqlStatementFactory(toolbox.withEngine(sqlEngine));
+    }
+
+    @Override
+    public void configure(Binder binder)
+    {
+      // Do nothing, this class exists for the Provider methods
+    }
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
index 4d7a5d26bb..1fc64ccded 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql.http;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import org.apache.druid.guice.Jerseys;
+import org.apache.druid.guice.LazySingleton;
 
 /**
  * The Module responsible for providing bindings to the SQL http endpoint
@@ -31,6 +32,7 @@ public class SqlHttpModule implements Module
   @Override
   public void configure(Binder binder)
   {
+    binder.bind(SqlResource.class).in(LazySingleton.class);
     Jerseys.addResource(binder, SqlResource.class);
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 28623a622e..db7f7d4ae7 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -21,15 +21,14 @@ package org.apache.druid.sql.http;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.io.CountingOutputStream;
 import com.google.inject.Inject;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.druid.common.exception.SanitizableException;
-import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.NativeQuery;
+import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -38,6 +37,10 @@ import org.apache.druid.query.QueryCapacityExceededException;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.QueryUnsupportedException;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.QueryResource;
+import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AuthorizationUtils;
@@ -52,8 +55,6 @@ import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
 import org.apache.druid.sql.SqlPlanningException;
 import org.apache.druid.sql.SqlRowTransformer;
 import org.apache.druid.sql.SqlStatementFactory;
-import org.apache.druid.sql.SqlStatementFactoryFactory;
-import org.apache.druid.sql.calcite.run.NativeSqlEngine;
 
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
@@ -63,13 +64,14 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
-
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -87,33 +89,18 @@ public class SqlResource
   private final SqlStatementFactory sqlStatementFactory;
   private final SqlLifecycleManager sqlLifecycleManager;
   private final ServerConfig serverConfig;
+  private final ResponseContextConfig responseContextConfig;
+  private final DruidNode selfNode;
 
   @Inject
-  public SqlResource(
-      @Json ObjectMapper jsonMapper,
-      AuthorizerMapper authorizerMapper,
-      NativeSqlEngine engine,
-      SqlStatementFactoryFactory sqlStatementFactoryFactory,
-      SqlLifecycleManager sqlLifecycleManager,
-      ServerConfig serverConfig
-  )
-  {
-    this(
-        jsonMapper,
-        authorizerMapper,
-        sqlStatementFactoryFactory.factorize(engine),
-        sqlLifecycleManager,
-        serverConfig
-    );
-  }
-
-  @VisibleForTesting
   SqlResource(
       final ObjectMapper jsonMapper,
       final AuthorizerMapper authorizerMapper,
-      final SqlStatementFactory sqlStatementFactory,
+      final @NativeQuery SqlStatementFactory sqlStatementFactory,
       final SqlLifecycleManager sqlLifecycleManager,
-      final ServerConfig serverConfig
+      final ServerConfig serverConfig,
+      ResponseContextConfig responseContextConfig,
+      @Self DruidNode selfNode
   )
   {
     this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
@@ -121,6 +108,9 @@ public class SqlResource
     this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory, "sqlStatementFactory");
     this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager");
     this.serverConfig = Preconditions.checkNotNull(serverConfig, "serverConfig");
+    this.responseContextConfig = responseContextConfig;
+    this.selfNode = selfNode;
+
   }
 
   @POST
@@ -138,17 +128,20 @@ public class SqlResource
     try {
       Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
       ResultSet resultSet = stmt.plan();
-      final Sequence<Object[]> sequence = resultSet.run();
+      final QueryResponse response = resultSet.run();
       final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
-      final Yielder<Object[]> yielder0 = Yielders.each(sequence);
+      final Yielder<Object[]> finalYielder = Yielders.each(response.getResults());
 
-      try {
-        final Response.ResponseBuilder responseBuilder = Response
-            .ok(
-                (StreamingOutput) outputStream -> {
+      final Response.ResponseBuilder responseBuilder = Response
+          .ok(
+              new StreamingOutput()
+              {
+                @Override
+                public void write(OutputStream output) throws IOException, WebApplicationException
+                {
                   Exception e = null;
-                  CountingOutputStream os = new CountingOutputStream(outputStream);
-                  Yielder<Object[]> yielder = yielder0;
+                  CountingOutputStream os = new CountingOutputStream(output);
+                  Yielder<Object[]> yielder = finalYielder;
 
                   try (final ResultFormat.Writer writer = sqlQuery.getResultFormat()
                                                                   .createFormatter(os, jsonMapper)) {
@@ -185,20 +178,24 @@ public class SqlResource
                     endLifecycle(stmt, e, os.getCount());
                   }
                 }
-            )
-            .header(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
-
-        if (sqlQuery.includeHeader()) {
-          responseBuilder.header(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE);
-        }
+              }
+          )
+          .header(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
 
-        return responseBuilder.build();
-      }
-      catch (Throwable e) {
-        // make sure to close yielder if anything happened before starting to serialize the response.
-        yielder0.close();
-        throw new RuntimeException(e);
+      if (sqlQuery.includeHeader()) {
+        responseBuilder.header(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE);
       }
+
+      QueryResource.attachResponseContextToHttpResponse(
+          sqlQueryId,
+          response.getResponseContext(),
+          responseBuilder,
+          jsonMapper,
+          responseContextConfig,
+          selfNode
+      );
+
+      return responseBuilder.build();
     }
     catch (QueryCapacityExceededException cap) {
       endLifecycle(stmt, cap, -1);
diff --git a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
index 2004fe9988..0653c533a1 100644
--- a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.sql;
 
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -70,7 +69,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import javax.servlet.http.HttpServletRequest;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -159,15 +157,18 @@ public class SqlStatementTest
         new CalciteRulesManager(ImmutableSet.of())
     );
 
-    this.sqlStatementFactory = new SqlStatementFactoryFactory(
-        plannerFactory,
-        new NoopServiceEmitter(),
-        testRequestLogger,
-        QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new AuthConfig(),
-        Suppliers.ofInstance(defaultQueryConfig),
-        new SqlLifecycleManager()
-    ).factorize(CalciteTests.createMockSqlEngine(walker, conglomerate));
+    this.sqlStatementFactory = new SqlStatementFactory(
+        new SqlToolbox(
+            CalciteTests.createMockSqlEngine(walker, conglomerate),
+            plannerFactory,
+            new NoopServiceEmitter(),
+            testRequestLogger,
+            QueryStackTests.DEFAULT_NOOP_SCHEDULER,
+            new AuthConfig(),
+            defaultQueryConfig,
+            new SqlLifecycleManager()
+        )
+    );
   }
 
   @After
@@ -221,7 +222,7 @@ public class SqlStatementTest
     DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
     ResultSet resultSet = stmt.plan();
     assertTrue(resultSet.runnable());
-    List<Object[]> results = resultSet.run().toList();
+    List<Object[]> results = resultSet.run().getResults().toList();
     assertEquals(1, results.size());
     assertEquals(6L, results.get(0)[0]);
     assertEquals("foo", results.get(0)[1]);
@@ -341,7 +342,7 @@ public class SqlStatementTest
         makeQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo"),
         request(true)
         );
-    List<Object[]> results = stmt.execute().toList();
+    List<Object[]> results = stmt.execute().getResults().toList();
     assertEquals(1, results.size());
     assertEquals(6L, results.get(0)[0]);
     assertEquals("foo", results.get(0)[1]);
@@ -422,6 +423,7 @@ public class SqlStatementTest
       List<Object[]> results = stmt
           .execute(Collections.emptyList())
           .execute()
+          .getResults()
           .toList();
       assertEquals(1, results.size());
       assertEquals(6L, results.get(0)[0]);
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 03763c4b5e..7d989cae23 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -72,8 +72,6 @@ import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
-import org.apache.druid.sql.calcite.run.NativeSqlEngine;
-import org.apache.druid.sql.calcite.run.SqlEngine;
 import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
 import org.apache.druid.sql.calcite.schema.DruidSchemaName;
 import org.apache.druid.sql.calcite.schema.NamedSchema;
@@ -81,6 +79,7 @@ import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryLogHook;
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.sql.guice.SqlModule;
 import org.eclipse.jetty.server.Server;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -221,7 +220,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
             binder.bind(QueryScheduler.class)
                   .toProvider(QuerySchedulerProvider.class)
                   .in(LazySingleton.class);
-            binder.bind(SqlEngine.class).to(NativeSqlEngine.class);
+            binder.install(new SqlModule.SqlStatementFactoryModule());
             binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
             binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
           }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index aea6a094d3..9171905960 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -917,7 +917,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
             .auth(authenticationResult)
             .build()
     );
-    Sequence<Object[]> results = stmt.execute();
+    Sequence<Object[]> results = stmt.execute().getResults();
     RelDataType rowType = stmt.prepareResult().getReturnedRowType();
     return new Pair<>(
         RowSignatures.fromRelDataType(rowType.getFieldNames(), rowType),
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index 2bd1dc2a8a..d614c23d5d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -4559,7 +4559,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
 
     QueryLifecycleFactory qlf = CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate);
     QueryLifecycle ql = qlf.factorize();
-    Sequence seq = ql.runSimple(query, CalciteTests.SUPER_USER_AUTH_RESULT, Access.OK);
+    Sequence seq = ql.runSimple(query, CalciteTests.SUPER_USER_AUTH_RESULT, Access.OK).getResults();
     List<Object> results = seq.toList();
     Assert.assertEquals(
         ImmutableList.of(ResultRow.of("def")),
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
index 3823bd4af7..f49bb0c40e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
@@ -200,8 +200,8 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe
     ) {
       final PlannerResult vectorPlan = vectorPlanner.plan();
       final PlannerResult nonVectorPlan = nonVectorPlanner.plan();
-      final Sequence<Object[]> vectorSequence = vectorPlan.run();
-      final Sequence<Object[]> nonVectorSequence = nonVectorPlan.run();
+      final Sequence<Object[]> vectorSequence = vectorPlan.run().getResults();
+      final Sequence<Object[]> nonVectorSequence = nonVectorPlan.run().getResults();
       Yielder<Object[]> vectorizedYielder = Yielders.each(vectorSequence);
       Yielder<Object[]> nonVectorizedYielder = Yielders.each(nonVectorSequence);
       int row = 0;
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
index 2576149d76..22c004947a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
@@ -21,9 +21,9 @@ package org.apache.druid.sql.calcite;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.runtime.Hook;
-import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.sql.calcite.rel.DruidQuery;
 import org.apache.druid.sql.calcite.run.QueryMaker;
 
@@ -45,7 +45,7 @@ public class TestInsertQueryMaker implements QueryMaker
   }
 
   @Override
-  public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
+  public QueryResponse runQuery(final DruidQuery druidQuery)
   {
     // Don't actually execute anything, but do record information that tests will check for.
 
@@ -53,6 +53,8 @@ public class TestInsertQueryMaker implements QueryMaker
     Hook.QUERY_PLAN.run(druidQuery.getQuery());
 
     // 2) Return the dataSource and signature of the insert operation, so tests can confirm they are correct.
-    return Sequences.simple(ImmutableList.of(new Object[]{targetDataSource, signature}));
+    return QueryResponse.withEmptyContext(
+        Sequences.simple(ImmutableList.of(new Object[]{targetDataSource, signature}))
+    );
   }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
index 99caee8125..1ba95b8bef 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.GlobalTableDataSource;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -53,6 +54,7 @@ import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.QueryLifecycle;
 import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.Access;
@@ -1089,7 +1091,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
     EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
     // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context
     EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
-            .andReturn(null);
+            .andReturn(QueryResponse.withEmptyContext(Sequences.empty()));
 
     EasyMock.replay(factoryMock, lifecycleMock);
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 6b051e8109..adc132d736 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -114,7 +114,7 @@ import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.SqlLifecycleManager;
 import org.apache.druid.sql.SqlStatementFactory;
-import org.apache.druid.sql.SqlStatementFactoryFactory;
+import org.apache.druid.sql.SqlToolbox;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
@@ -804,15 +804,18 @@ public class CalciteTests
       final AuthConfig authConfig
   )
   {
-    return new SqlStatementFactoryFactory(
-        plannerFactory,
-        new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
-        new NoopRequestLogger(),
-        QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        authConfig,
-        Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())),
-        new SqlLifecycleManager()
-    ).factorize(engine);
+    return new SqlStatementFactory(
+        new SqlToolbox(
+            engine,
+            plannerFactory,
+            new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
+            new NoopRequestLogger(),
+            QueryStackTests.DEFAULT_NOOP_SCHEDULER,
+            authConfig,
+            new DefaultQueryConfig(ImmutableMap.of()),
+            new SqlLifecycleManager()
+        )
+    );
   }
 
   public static ObjectMapper getJsonMapper()
diff --git a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java
index 4b87c207f0..12b8d5f8a9 100644
--- a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java
@@ -53,6 +53,8 @@ import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QuerySchedulerProvider;
+import org.apache.druid.server.ResponseContextConfig;
+import org.apache.druid.server.initialization.AuthenticatorMapperModule;
 import org.apache.druid.server.log.NoopRequestLogger;
 import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -63,6 +65,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.view.DruidViewMacro;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.sql.calcite.view.ViewManager;
+import org.apache.druid.sql.http.SqlResourceTest;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
@@ -165,12 +168,16 @@ public class SqlModuleTest
 
   private Injector makeInjectorWithProperties(final Properties props)
   {
+    final SqlModule sqlModule = new SqlModule();
+    sqlModule.setProps(props);
+
     return Guice.createInjector(
         ImmutableList.of(
             new DruidGuiceExtensions(),
             new LifecycleModule(),
             new ServerModule(),
             new JacksonModule(),
+            new AuthenticatorMapperModule(),
             (Module) binder -> {
               binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
               binder.bind(JsonConfigurator.class).in(LazySingleton.class);
@@ -196,8 +203,9 @@ public class SqlModuleTest
               binder.bind(QueryScheduler.class)
                     .toProvider(QuerySchedulerProvider.class)
                     .in(LazySingleton.class);
+              binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG);
             },
-            new SqlModule(props),
+            sqlModule,
             new TestViewManagerModule()
         )
     );
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
index e105398998..6916358cb5 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
@@ -29,11 +29,13 @@ import org.apache.druid.guice.DruidGuiceExtensions;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.JSR311Resource;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.NativeQuery;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.SqlStatementFactory;
-import org.apache.druid.sql.SqlStatementFactoryFactory;
 import org.apache.druid.sql.calcite.run.NativeSqlEngine;
-import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
@@ -50,8 +52,6 @@ public class SqlHttpModuleTest
 {
   @Mock
   private ObjectMapper jsonMpper;
-  @Mock
-  private SqlStatementFactoryFactory sqlStatementFactoryFactory;
 
   private SqlHttpModule target;
   private Injector injector;
@@ -59,39 +59,40 @@ public class SqlHttpModuleTest
   @Before
   public void setUp()
   {
-    EasyMock.expect(sqlStatementFactoryFactory.factorize(EasyMock.capture(Capture.newInstance())))
-            .andReturn(EasyMock.mock(SqlStatementFactory.class))
-            .anyTimes();
-    EasyMock.replay(sqlStatementFactoryFactory);
-
     target = new SqlHttpModule();
     injector = Guice.createInjector(
         new LifecycleModule(),
         new DruidGuiceExtensions(),
         binder -> {
           binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper);
-          binder.bind(SqlStatementFactoryFactory.class).toInstance(sqlStatementFactoryFactory);
           binder.bind(AuthorizerMapper.class).toInstance(new AuthorizerMapper(Collections.emptyMap()));
           binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new NativeSqlEngine(null, null)));
+          binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(SqlResourceTest.DUMMY_DRUID_NODE);
+          binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG);
+          binder.bind(SqlStatementFactory.class)
+                .annotatedWith(NativeQuery.class)
+                .toInstance(EasyMock.mock(SqlStatementFactory.class));
         },
         target
     );
   }
 
   @Test
-  public void testSqlResourceIsInjectedAndNotSingleton()
+  public void testSqlResourceIsInjectedAndSingleton()
   {
     SqlResource sqlResource = injector.getInstance(SqlResource.class);
     Assert.assertNotNull(sqlResource);
     SqlResource other = injector.getInstance(SqlResource.class);
-    Assert.assertNotSame(other, sqlResource);
+    Assert.assertSame(other, sqlResource);
   }
 
   @Test
   public void testSqlResourceIsAvailableViaJersey()
   {
     Set<Class<?>> jerseyResourceClasses =
-        injector.getInstance(Key.get(new TypeLiteral<Set<Class<?>>>() {}, JSR311Resource.class));
+        injector.getInstance(Key.get(new TypeLiteral<Set<Class<?>>>()
+        {
+        }, JSR311Resource.class));
     Assert.assertEquals(1, jerseyResourceClasses.size());
     Assert.assertEquals(SqlResource.class, jerseyResourceClasses.iterator().next());
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 4a16c344c1..90fadf9280 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.calcite.avatica.SqlType;
+import org.apache.commons.io.output.NullOutputStream;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.exception.AllowedRegexErrorResponseTransformStrategy;
 import org.apache.druid.common.exception.ErrorResponseTransformStrategy;
@@ -57,9 +58,13 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.QueryUnsupportedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.log.TestRequestLogger;
 import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
@@ -96,10 +101,8 @@ import org.easymock.EasyMock;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -110,7 +113,6 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -130,11 +132,15 @@ import java.util.stream.Collectors;
 
 public class SqlResourceTest extends CalciteTestBase
 {
+  public static final DruidNode DUMMY_DRUID_NODE = new DruidNode("dummy", "dummy", false, 1, null, true, false);
+  public static final ResponseContextConfig TEST_RESPONSE_CONTEXT_CONFIG = ResponseContextConfig.newConfig(false);
+
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final String DUMMY_SQL_QUERY_ID = "dummy";
   // Timeout to allow (rapid) debugging, while not blocking tests with errors.
   private static final int WAIT_TIMEOUT_SECS = 60;
-  private static final Consumer<DirectStatement> NULL_ACTION = s -> {};
+  private static final Consumer<DirectStatement> NULL_ACTION = s -> {
+  };
 
   private static final List<String> EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS =
       Arrays.asList("__time", "dim1", "dim2", "dim3", "cnt", "m1", "m2", "unique_dim1", "EXPR$8");
@@ -166,26 +172,17 @@ public class SqlResourceTest extends CalciteTestBase
   private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier = new SettableSupplier<>();
   private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier = new SettableSupplier<>();
   private final SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier = new SettableSupplier<>();
+  private final SettableSupplier<ResponseContext> responseContextSupplier = new SettableSupplier<>();
   private Consumer<DirectStatement> onExecute = NULL_ACTION;
 
   private boolean sleep;
 
-  @BeforeClass
-  public static void setUpClass()
+  @Before
+  public void setUp() throws Exception
   {
     resourceCloser = Closer.create();
     conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
-  }
 
-  @AfterClass
-  public static void tearDownClass() throws IOException
-  {
-    resourceCloser.close();
-  }
-
-  @Before
-  public void setUp() throws Exception
-  {
     final QueryScheduler scheduler = new QueryScheduler(
         5,
         ManualQueryPrioritizationStrategy.INSTANCE,
@@ -265,7 +262,7 @@ public class SqlResourceTest extends CalciteTestBase
         defaultQueryConfig,
         lifecycleManager
     );
-    sqlStatementFactory = new SqlStatementFactory()
+    sqlStatementFactory = new SqlStatementFactory(null)
     {
       @Override
       public HttpStatement httpStatement(
@@ -281,6 +278,7 @@ public class SqlResourceTest extends CalciteTestBase
             planLatchSupplier,
             executeLatchSupplier,
             sequenceMapFnSupplier,
+            responseContextSupplier,
             onExecute
         );
         onExecute = NULL_ACTION;
@@ -304,7 +302,9 @@ public class SqlResourceTest extends CalciteTestBase
         CalciteTests.TEST_AUTHORIZER_MAPPER,
         sqlStatementFactory,
         lifecycleManager,
-        new ServerConfig()
+        new ServerConfig(),
+        TEST_RESPONSE_CONTEXT_CONFIG,
+        DUMMY_DRUID_NODE
     );
   }
 
@@ -320,6 +320,7 @@ public class SqlResourceTest extends CalciteTestBase
     walker = null;
     executorService.shutdownNow();
     executorService.awaitTermination(2, TimeUnit.SECONDS);
+    resourceCloser.close();
   }
 
   @Test
@@ -358,6 +359,55 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
+  @Test
+  public void testCountStarWithMissingIntervalsContext() throws Exception
+  {
+    final SqlQuery sqlQuery = new SqlQuery(
+        "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
+        null,
+        false,
+        false,
+        false,
+        // We set uncoveredIntervalsLimit more for the funzies than anything.  The underlying setup of the test doesn't
+        // actually look at it or operate with it.  Instead, we set the supplier of the ResponseContext to mock what
+        // we would expect from the normal query pipeline
+        ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id", "uncoveredIntervalsLimit", 1),
+        null
+    );
+
+    final ResponseContext mockRespContext = ResponseContext.createEmpty();
+    mockRespContext.put(ResponseContext.Keys.instance().keyOf("uncoveredIntervals"), "2030-01-01/78149827981274-01-01");
+    mockRespContext.put(ResponseContext.Keys.instance().keyOf("uncoveredIntervalsOverflowed"), "true");
+    responseContextSupplier.set(mockRespContext);
+
+    final Response response = resource.doPost(sqlQuery, makeRegularUserReq());
+
+    Map responseContext = JSON_MAPPER.readValue(
+        (String) response.getMetadata().getFirst("X-Druid-Response-Context"),
+        Map.class
+    );
+    Assert.assertEquals(
+        ImmutableMap.of(
+            "uncoveredIntervals", "2030-01-01/78149827981274-01-01",
+            "uncoveredIntervalsOverflowed", "true"
+        ),
+        responseContext
+    );
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ((StreamingOutput) response.getEntity()).write(baos);
+    Object results = JSON_MAPPER.readValue(baos.toByteArray(), Object.class);
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            ImmutableMap.of("cnt", 6, "TheFoo", "foo")
+        ),
+        results
+    );
+    checkSqlRequestLog(true);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
+  }
+
   @Test
   public void testSqlLifecycleMetrics() throws Exception
   {
@@ -594,7 +644,9 @@ public class SqlResourceTest extends CalciteTestBase
         ),
         doPost(
             new SqlQuery(query, ResultFormat.ARRAY, false, false, false, null, null),
-            new TypeReference<List<List<Object>>>() {}
+            new TypeReference<List<List<Object>>>()
+            {
+            }
         ).rhs
     );
   }
@@ -705,7 +757,9 @@ public class SqlResourceTest extends CalciteTestBase
         ),
         doPost(
             new SqlQuery(query, ResultFormat.ARRAY, true, true, true, null, null),
-            new TypeReference<List<List<Object>>>() {}
+            new TypeReference<List<List<Object>>>()
+            {
+            }
         ).rhs
     );
   }
@@ -729,7 +783,9 @@ public class SqlResourceTest extends CalciteTestBase
         ),
         doPost(
             new SqlQuery(query, ResultFormat.ARRAY, true, true, true, null, null),
-            new TypeReference<List<List<Object>>>() {}
+            new TypeReference<List<List<Object>>>()
+            {
+            }
         ).rhs
     );
   }
@@ -896,7 +952,9 @@ public class SqlResourceTest extends CalciteTestBase
         ).stream().map(transformer).collect(Collectors.toList()),
         doPost(
             new SqlQuery(query, ResultFormat.OBJECT, false, false, false, null, null),
-            new TypeReference<List<Map<String, Object>>>() {}
+            new TypeReference<List<Map<String, Object>>>()
+            {
+            }
         ).rhs
     );
   }
@@ -1102,10 +1160,10 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertEquals(4, lines.size());
     Assert.assertEquals(expectedHeader, JSON_MAPPER.readValue(lines.get(0), Object.class));
     Assert.assertEquals(
-            ImmutableMap
-                .<String, Object>builder()
-                .put("EXPR$0", Arrays.asList(1, 2))
-                .build(),
+        ImmutableMap
+            .<String, Object>builder()
+            .put("EXPR$0", Arrays.asList(1, 2))
+            .build(),
         JSON_MAPPER.readValue(lines.get(1), Object.class)
     );
 
@@ -1338,8 +1396,8 @@ public class SqlResourceTest extends CalciteTestBase
             false,
             ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id"),
             null
-            )
-        ).lhs;
+        )
+    ).lhs;
 
     Assert.assertNotNull(exception);
     Assert.assertEquals(QueryUnsupportedException.ERROR_CODE, exception.getErrorCode());
@@ -1362,8 +1420,9 @@ public class SqlResourceTest extends CalciteTestBase
             false,
             ImmutableMap.of("sqlQueryId", queryId),
             null
-            ),
-        req);
+        ),
+        req
+    );
     Assert.assertNotEquals(200, response.getStatus());
     final MultivaluedMap<String, Object> headers = response.getMetadata();
     Assert.assertTrue(headers.containsKey(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER));
@@ -1385,8 +1444,9 @@ public class SqlResourceTest extends CalciteTestBase
             false,
             ImmutableMap.of(),
             null
-            ),
-        req);
+        ),
+        req
+    );
     Assert.assertNotEquals(200, response.getStatus());
     final MultivaluedMap<String, Object> headers = response.getMetadata();
     Assert.assertTrue(headers.containsKey(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER));
@@ -1402,7 +1462,8 @@ public class SqlResourceTest extends CalciteTestBase
         CalciteTests.TEST_AUTHORIZER_MAPPER,
         sqlStatementFactory,
         lifecycleManager,
-        new ServerConfig() {
+        new ServerConfig()
+        {
           @Override
           public boolean isShowDetailedJettyErrors()
           {
@@ -1414,7 +1475,9 @@ public class SqlResourceTest extends CalciteTestBase
           {
             return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
           }
-        }
+        },
+        TEST_RESPONSE_CONTEXT_CONFIG,
+        DUMMY_DRUID_NODE
     );
 
     String errorMessage = "This will be supported in Druid 9999";
@@ -1428,8 +1491,8 @@ public class SqlResourceTest extends CalciteTestBase
             false,
             ImmutableMap.of("sqlQueryId", "id"),
             null
-          )
-        ).lhs;
+        )
+    ).lhs;
 
     Assert.assertNotNull(exception);
     Assert.assertNull(exception.getMessage());
@@ -1460,7 +1523,9 @@ public class SqlResourceTest extends CalciteTestBase
           {
             return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
           }
-        }
+        },
+        TEST_RESPONSE_CONTEXT_CONFIG,
+        DUMMY_DRUID_NODE
     );
 
     String errorMessage = "could not assert";
@@ -1477,8 +1542,8 @@ public class SqlResourceTest extends CalciteTestBase
             false,
             ImmutableMap.of("sqlQueryId", "id"),
             null
-            )
-        ).lhs;
+        )
+    ).lhs;
 
     Assert.assertNotNull(exception);
     Assert.assertNull(exception.getMessage());
@@ -1653,6 +1718,24 @@ public class SqlResourceTest extends CalciteTestBase
 
     execLatch.countDown();
     response = future.get();
+    // The response that we get is the actual object created by the SqlResource.  The StreamingOutput object that
+    // the SqlResource returns at the time of writing has resources opened up (the query is already running) which
+    // need to be closed.  As such, the StreamingOutput needs to actually be called in order to cause that close
+    // to occur, so we must get the entity out and call `.write(OutputStream)` on it to invoke the code.
+    try {
+      ((StreamingOutput) response.getEntity()).write(NullOutputStream.NULL_OUTPUT_STREAM);
+    }
+    catch (IllegalStateException e) {
+      // When we actually attempt to write to the output stream, we seem to run into multi-threading issues likely
+      // with our test setup.  Instead of figuring out how to make the thing work, given that we don't actually
+      // care about the response, we are going to just ensure that it was the expected exception and ignore it.
+      // It's possible that this test starts failing suddenly if someone changes the message of the exception, it
+      // should be safe to just update the expected message here too if that happens.
+      Assert.assertEquals(
+          "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or metric information from multiple threads or from an async thread, this information should explicitly be passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be reassigned explicitly",
+          e.getMessage()
+      );
+    }
     Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
   }
 
@@ -1906,6 +1989,7 @@ public class SqlResourceTest extends CalciteTestBase
     private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier;
     private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier;
     private final SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier;
+    private final SettableSupplier<ResponseContext> responseContextSupplier;
     private final Consumer<DirectStatement> onExecute;
 
     private TestHttpStatement(
@@ -1916,6 +2000,7 @@ public class SqlResourceTest extends CalciteTestBase
         SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier,
         SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier,
         SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier,
+        SettableSupplier<ResponseContext> responseContextSupplier,
         final Consumer<DirectStatement> onAuthorize
     )
     {
@@ -1924,13 +2009,15 @@ public class SqlResourceTest extends CalciteTestBase
       this.planLatchSupplier = planLatchSupplier;
       this.executeLatchSupplier = executeLatchSupplier;
       this.sequenceMapFnSupplier = sequenceMapFnSupplier;
+      this.responseContextSupplier = responseContextSupplier;
       this.onExecute = onAuthorize;
     }
 
     @Override
     protected void authorize(
         DruidPlanner planner,
-        Function<Set<ResourceAction>, Access> authorizer)
+        Function<Set<ResourceAction>, Access> authorizer
+    )
     {
       if (validateAndAuthorizeLatchSupplier.get() != null) {
         if (validateAndAuthorizeLatchSupplier.get().rhs) {
@@ -1955,14 +2042,15 @@ public class SqlResourceTest extends CalciteTestBase
     @Override
     public PlannerResult createPlan(DruidPlanner planner)
     {
-      if (planLatchSupplier.get() != null) {
-        if (planLatchSupplier.get().rhs) {
+      final NonnullPair<CountDownLatch, Boolean> planLatch = planLatchSupplier.get();
+      if (planLatch != null) {
+        if (planLatch.rhs) {
           PlannerResult result = super.createPlan(planner);
-          planLatchSupplier.get().lhs.countDown();
+          planLatch.lhs.countDown();
           return result;
         } else {
           try {
-            if (!planLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
+            if (!planLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
               throw new RuntimeException("Latch timed out");
             }
           }
@@ -1989,30 +2077,37 @@ public class SqlResourceTest extends CalciteTestBase
       return new ResultSet(plannerResult)
       {
         @Override
-        public Sequence<Object[]> run()
+        public QueryResponse run()
         {
           final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
               Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
 
-          if (executeLatchSupplier.get() != null) {
-            if (executeLatchSupplier.get().rhs) {
-              Sequence<Object[]> sequence = sequenceMapFn.apply(super.run());
-              executeLatchSupplier.get().lhs.countDown();
-              return sequence;
+          final NonnullPair<CountDownLatch, Boolean> executeLatch = executeLatchSupplier.get();
+          if (executeLatch != null) {
+            if (executeLatch.rhs) {
+              final QueryResponse resp = super.run();
+              Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
+              executeLatch.lhs.countDown();
+              final ResponseContext respContext = resp.getResponseContext();
+              respContext.merge(responseContextSupplier.get());
+              return new QueryResponse(sequence, respContext);
             } else {
               try {
-                if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
+                if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
                   throw new RuntimeException("Latch timed out");
                 }
               }
               catch (InterruptedException e) {
                 throw new RuntimeException(e);
               }
-              return sequenceMapFn.apply(super.run());
             }
-          } else {
-            return sequenceMapFn.apply(super.run());
           }
+
+          final QueryResponse resp = super.run();
+          Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
+          final ResponseContext respContext = resp.getResponseContext();
+          respContext.merge(responseContextSupplier.get());
+          return new QueryResponse(sequence, respContext);
         }
       };
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org