You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/10 00:36:30 UTC

[hive] branch master updated: HIVE-23184 : Upgrade druid to 0.17.1 ( Nishant Bangarwa via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a6ec1e  HIVE-23184 : Upgrade druid to 0.17.1 ( Nishant Bangarwa via Ashutosh Chauhan)
9a6ec1e is described below

commit 9a6ec1e351d59b76419a21d1f2c8781e306b02d0
Author: Nishant Bangarwa <ni...@gmail.com>
AuthorDate: Mon Apr 13 22:29:26 2020 +0530

    HIVE-23184 : Upgrade druid to 0.17.1 ( Nishant Bangarwa via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 data/scripts/kafka_init_data.csv                   |   2 +-
 druid-handler/pom.xml                              |   6 +
 .../apache/hadoop/hive/druid/DruidKafkaUtils.java  |  43 ++-
 .../hadoop/hive/druid/DruidStorageHandler.java     |  61 ++--
 .../hive/druid/DruidStorageHandlerUtils.java       |  62 ++---
 .../hadoop/hive/druid/io/DruidOutputFormat.java    |  22 +-
 .../hive/druid/io/DruidQueryBasedInputFormat.java  |  56 +---
 .../hadoop/hive/druid/io/DruidRecordWriter.java    |  10 +-
 .../druid/json/KafkaIndexTaskTuningConfig.java     | 128 +++++++++
 .../hive/druid/json/KafkaSupervisorSpec.java       |  20 +-
 .../druid/json/KafkaSupervisorTuningConfig.java    | 208 +++++---------
 .../hadoop/hive/druid/json/KafkaTuningConfig.java  | 307 --------------------
 .../json/SeekableStreamIndexTaskTuningConfig.java  | 308 +++++++++++++++++++++
 .../json/SeekableStreamSupervisorTuningConfig.java |  59 ++++
 .../hive/druid/serde/DruidQueryRecordReader.java   |   3 +-
 .../druid/serde/DruidSelectQueryRecordReader.java  |  92 ------
 .../hadoop/hive/druid/TestDruidStorageHandler.java |   8 +-
 .../druid/TestHiveDruidQueryBasedInputFormat.java  |  44 ---
 .../hadoop/hive/druid/serde/TestDruidSerDe.java    | 244 ++++------------
 .../hadoop/hive/ql/io/TestDruidRecordWriter.java   |  21 +-
 itests/qtest-druid/pom.xml                         |  22 ++
 pom.xml                                            |   2 +-
 .../druid/druidkafkamini_delimited.q.out           |   2 +-
 .../druidmini_semijoin_reduction_all_types.q.out   |  24 +-
 24 files changed, 753 insertions(+), 1001 deletions(-)

diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv
index 5dc094e..d818144 100644
--- a/data/scripts/kafka_init_data.csv
+++ b/data/scripts/kafka_init_data.csv
@@ -1,4 +1,4 @@
-"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143
+"2013-08-31T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143
 "2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330
 "2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111
 "2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index c7a2d4c..e6ca298 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -293,6 +293,12 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>${log4j2.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
index b56d48a..fb6ce30 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
@@ -29,8 +29,8 @@ import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.FullResponseHandler;
-import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -66,11 +66,13 @@ final class DruidKafkaUtils {
   private DruidKafkaUtils() {
   }
 
-  static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table,
+  static KafkaSupervisorSpec createKafkaSupervisorSpec(
+      Table table,
       String kafkaTopic,
       String kafkaServers,
       DataSchema dataSchema,
-      IndexSpec indexSpec) {
+      IndexSpec indexSpec
+  ) {
     return new KafkaSupervisorSpec(dataSchema,
         new KafkaSupervisorTuningConfig(DruidStorageHandlerUtils.getIntegerProperty(table,
             DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"),
@@ -78,17 +80,14 @@ final class DruidKafkaUtils {
                         DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxBytesInMemory"),
 
             DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"),
-                DruidStorageHandlerUtils.getLongProperty(table,
-                        DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"),
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), DruidStorageHandlerUtils
+            .getLongProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"),
             DruidStorageHandlerUtils.getPeriodProperty(table,
-                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"),
-            null,
+                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), null,
             // basePersistDirectory - use druid default, no need to be configured by user
-            DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"),
-            indexSpec,
-            null,
+            DruidStorageHandlerUtils
+                .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"),
+            indexSpec, null, null,
             // buildV9Directly - use druid default, no need to be configured by user
             DruidStorageHandlerUtils.getBooleanProperty(table,
                 DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"),
@@ -96,9 +95,8 @@ final class DruidKafkaUtils {
                 DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"),
             DruidStorageHandlerUtils.getBooleanProperty(table,
                 DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"),
-                TmpFileSegmentWriteOutMediumFactory.instance(),
-                DruidStorageHandlerUtils.getIntegerProperty(table,
-                DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"),
+            TmpFileSegmentWriteOutMediumFactory.instance(), DruidStorageHandlerUtils
+            .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"),
             DruidStorageHandlerUtils.getIntegerProperty(table,
                 DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"),
             DruidStorageHandlerUtils.getLongProperty(table,
@@ -161,14 +159,11 @@ final class DruidKafkaUtils {
       String task = JSON_MAPPER.writeValueAsString(spec);
       CONSOLE.printInfo("submitting kafka Spec {}", task);
       LOG.info("submitting kafka Supervisor Spec {}", task);
-      FullResponseHolder
-          response =
-          DruidStorageHandlerUtils.getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(),
-              new Request(HttpMethod.POST,
-                  new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))).setContent(
-                  "application/json",
-                  JSON_MAPPER.writeValueAsBytes(spec)),
-              new FullResponseHandler(Charset.forName("UTF-8")));
+      StringFullResponseHolder response = DruidStorageHandlerUtils
+          .getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.POST,
+                  new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
+                  .setContent("application/json", JSON_MAPPER.writeValueAsBytes(spec)),
+              new StringFullResponseHandler(Charset.forName("UTF-8")));
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         String
             msg =
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index fe55eff..beaf249 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -39,8 +39,8 @@ import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.HttpClientConfig;
 import org.apache.druid.java.util.http.client.HttpClientInit;
 import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.FullResponseHandler;
-import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SQLMetadataConnector;
@@ -50,6 +50,7 @@ import org.apache.druid.metadata.storage.mysql.MySQLConnector;
 import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig;
 import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnector;
 import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnectorConfig;
+import org.apache.druid.metadata.storage.postgresql.PostgreSQLTablesConfig;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -365,15 +366,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
 
   private void resetKafkaIngestion(String overlordAddress, String dataSourceName) {
     try {
-      FullResponseHolder
+      StringFullResponseHolder
           response =
           RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
-              new Request(HttpMethod.POST,
-                  new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/reset",
-                      overlordAddress,
-                      dataSourceName))),
-              new FullResponseHandler(Charset.forName("UTF-8"))),
-              input -> input instanceof IOException,
+              new Request(HttpMethod.POST, new URL(
+                  String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, dataSourceName))),
+              new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
               getMaxRetryCount());
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         CONSOLE.printInfo("Druid Kafka Ingestion Reset successful.");
@@ -389,15 +387,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
 
   private void stopKafkaIngestion(String overlordAddress, String dataSourceName) {
     try {
-      FullResponseHolder
+      StringFullResponseHolder
           response =
           RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
-              new Request(HttpMethod.POST,
-                  new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown",
-                      overlordAddress,
-                      dataSourceName))),
-              new FullResponseHandler(Charset.forName("UTF-8"))),
-              input -> input instanceof IOException,
+              new Request(HttpMethod.POST, new URL(
+                  String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, dataSourceName))),
+              new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
               getMaxRetryCount());
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         CONSOLE.printInfo("Druid Kafka Ingestion shutdown successful.");
@@ -423,13 +418,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
         Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
             "Druid Datasource name is null");
     try {
-      FullResponseHolder
+      StringFullResponseHolder
           response =
           RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
               new Request(HttpMethod.GET,
                   new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, dataSourceName))),
-              new FullResponseHandler(Charset.forName("UTF-8"))),
-              input -> input instanceof IOException,
+              new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
               getMaxRetryCount());
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         return JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorSpec.class);
@@ -465,15 +459,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
         Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
             "Druid Datasource name is null");
     try {
-      FullResponseHolder
+      StringFullResponseHolder
           response =
           RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
-              new Request(HttpMethod.GET,
-                  new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/status",
-                      overlordAddress,
-                      dataSourceName))),
-              new FullResponseHandler(Charset.forName("UTF-8"))),
-              input -> input instanceof IOException,
+              new Request(HttpMethod.GET, new URL(
+                  String.format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, dataSourceName))),
+              new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
               getMaxRetryCount());
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
         return DruidStorageHandlerUtils.JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorReport.class);
@@ -546,9 +537,8 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
       coordinatorResponse =
           RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
               new Request(HttpMethod.GET, new URL(String.format("http://%s/status", coordinatorAddress))),
-              new FullResponseHandler(Charset.forName("UTF-8"))).getContent(),
-              input -> input instanceof IOException,
-              maxTries);
+              new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(),
+              input -> input instanceof IOException, maxTries);
     } catch (Exception e) {
       CONSOLE.printInfo("Will skip waiting for data loading, coordinator unavailable");
       return;
@@ -578,11 +568,9 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     while (numRetries++ < maxTries && !urlsOfUnloadedSegments.isEmpty()) {
       urlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(urlsOfUnloadedSegments, input -> {
         try {
-          String
-              result =
-              DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
-                  new Request(HttpMethod.GET, input),
-                  new FullResponseHandler(Charset.forName("UTF-8"))).getContent();
+          String result = DruidStorageHandlerUtils
+              .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input),
+                  new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent();
 
           LOG.debug("Checking segment [{}] response is [{}]", input, result);
           return Strings.isNullOrEmpty(result);
@@ -878,9 +866,8 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
     case "postgresql":
       connector =
           new PostgreSQLConnector(storageConnectorConfigSupplier,
-              Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()),
-                  new PostgreSQLConnectorConfig()
-          );
+              Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), new PostgreSQLConnectorConfig(),
+              new PostgreSQLTablesConfig());
 
       break;
     case "derby":
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 1d7009b..5723150 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import org.apache.calcite.adapter.druid.DruidQuery;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.guice.BloomFilterSerializersModule;
@@ -46,9 +47,9 @@ import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.FullResponseHandler;
-import org.apache.druid.java.util.http.client.response.FullResponseHolder;
 import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SQLMetadataConnector;
@@ -79,8 +80,6 @@ import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.ordering.StringComparator;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.select.SelectQuery;
-import org.apache.druid.query.select.SelectQueryConfig;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.topn.TopNQuery;
@@ -225,25 +224,18 @@ public final class DruidStorageHandlerUtils {
   private static final int DEFAULT_MAX_TRIES = 10;
 
   static {
+    // This is needed to initliaze NullHandling for druid without guice.
+    NullHandling.initializeForTests();
     // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig
-    InjectableValues.Std
-        injectableValues =
-        new InjectableValues.Std().addValue(SelectQueryConfig.class, new SelectQueryConfig(false))
-            // Expressions macro table used when we deserialize the query from calcite plan
-            .addValue(ExprMacroTable.class,
-                new ExprMacroTable(ImmutableList.of(new LikeExprMacro(),
-                    new RegexpExtractExprMacro(),
-                    new TimestampCeilExprMacro(),
-                    new TimestampExtractExprMacro(),
-                    new TimestampFormatExprMacro(),
-                    new TimestampParseExprMacro(),
-                    new TimestampShiftExprMacro(),
-                    new TimestampFloorExprMacro(),
-                    new TrimExprMacro.BothTrimExprMacro(),
-                    new TrimExprMacro.LeftTrimExprMacro(),
-                    new TrimExprMacro.RightTrimExprMacro())))
-            .addValue(ObjectMapper.class, JSON_MAPPER)
-            .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
+    InjectableValues.Std injectableValues = new InjectableValues.Std()
+        // Expressions macro table used when we deserialize the query from calcite plan
+        .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList
+            .of(new LikeExprMacro(), new RegexpExtractExprMacro(), new TimestampCeilExprMacro(),
+                new TimestampExtractExprMacro(), new TimestampFormatExprMacro(), new TimestampParseExprMacro(),
+                new TimestampShiftExprMacro(), new TimestampFloorExprMacro(), new TrimExprMacro.BothTrimExprMacro(),
+                new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro())))
+        .addValue(ObjectMapper.class, JSON_MAPPER)
+        .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT);
 
     JSON_MAPPER.setInjectableValues(injectableValues);
     SMILE_MAPPER.setInjectableValues(injectableValues);
@@ -331,10 +323,9 @@ public final class DruidStorageHandlerUtils {
 
   }
 
-  static FullResponseHolder getResponseFromCurrentLeader(HttpClient client,
-      Request request,
-      FullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException {
-    FullResponseHolder responseHolder = client.go(request, fullResponseHandler).get();
+  static StringFullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request,
+      StringFullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException {
+    StringFullResponseHolder responseHolder = client.go(request, fullResponseHandler).get();
     if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) {
       String redirectUrlStr = responseHolder.getResponse().headers().get("Location");
       LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr);
@@ -342,9 +333,9 @@ public final class DruidStorageHandlerUtils {
       try {
         redirectUrl = new URL(redirectUrlStr);
       } catch (MalformedURLException ex) {
-        throw new ExecutionException(String.format(
-            "Malformed redirect location is found in response from url[%s], new location[%s].",
-            request.getUrl(),
+        throw new ExecutionException(String
+            .format("Malformed redirect location is found in response from url[%s], new location[%s].",
+                request.getUrl(),
             redirectUrlStr), ex);
       }
       responseHolder = client.go(withUrl(request, redirectUrl), fullResponseHandler).get();
@@ -638,12 +629,11 @@ public final class DruidStorageHandlerUtils {
   }
 
   public static String createScanAllQuery(String dataSourceName, List<String> columns) throws JsonProcessingException {
-    final ScanQuery.ScanQueryBuilder scanQueryBuilder = ScanQuery.newScanQueryBuilder();
+    final Druids.ScanQueryBuilder scanQueryBuilder = Druids.newScanQueryBuilder();
     final List<Interval> intervals = Collections.singletonList(DEFAULT_INTERVAL);
     ScanQuery
         scanQuery =
-        scanQueryBuilder.dataSource(dataSourceName)
-            .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
+        scanQueryBuilder.dataSource(dataSourceName).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
             .intervals(new MultipleIntervalSegmentSpec(intervals))
             .columns(columns)
             .build();
@@ -977,11 +967,7 @@ public final class DruidStorageHandlerUtils {
             .setVirtualColumns(VirtualColumns.create(virtualColumns)).build();
         break;
       case org.apache.druid.query.Query.SCAN:
-        rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query).filters(filter)
-            .virtualColumns(VirtualColumns.create(virtualColumns)).build();
-        break;
-      case org.apache.druid.query.Query.SELECT:
-        rv = Druids.SelectQueryBuilder.copy((SelectQuery) query).filters(filter)
+        rv = Druids.ScanQueryBuilder.copy((ScanQuery) query).filters(filter)
             .virtualColumns(VirtualColumns.create(virtualColumns)).build();
         break;
       default:
@@ -1140,8 +1126,6 @@ public final class DruidStorageHandlerUtils {
       return ((GroupByQuery) query).getVirtualColumns();
     case org.apache.druid.query.Query.SCAN:
       return ((ScanQuery) query).getVirtualColumns();
-    case org.apache.druid.query.Query.SELECT:
-      return ((SelectQuery) query).getVirtualColumns();
     default:
       throw new UnsupportedOperationException("Unsupported Query type " + query);
     }
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 6cf3ef2..d90db9c 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -152,25 +152,9 @@ public class DruidOutputFormat implements HiveOutputFormat<NullWritable, DruidWr
     Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY);
 
     IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(jc);
-    RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory,
-            null,
-            null,
-            null,
-            new File(basePersistDirectory, dataSource),
-            new CustomVersioningPolicy(version),
-            null,
-            null,
-            null,
-            indexSpec,
-            true,
-            0,
-            0,
-            true,
-            null,
-            0L,
-        null,
-            null
-    );
+    RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, null, null, null,
+        new File(basePersistDirectory, dataSource), new CustomVersioningPolicy(version), null, null, null, indexSpec,
+        null, true, 0, 0, true, null, 0L, null, null);
 
     LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
     return new DruidRecordWriter(dataSchema, realtimeTuningConfig,
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 82a1f11..37e7206 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -19,28 +19,24 @@ package org.apache.hadoop.hive.druid.io;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.LocatedSegmentDescriptor;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.select.PagingSpec;
-import org.apache.druid.query.select.SelectQuery;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandler;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.conf.DruidConstants;
 import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
 import org.apache.hadoop.hive.druid.serde.DruidWritable;
@@ -88,8 +84,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       return new DruidTopNQueryRecordReader();
     case Query.GROUP_BY:
       return new DruidGroupByQueryRecordReader();
-    case Query.SELECT:
-      return new DruidSelectQueryRecordReader();
     case Query.SCAN:
       return new DruidScanQueryRecordReader();
     default:
@@ -152,9 +146,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     case Query.TOPN:
     case Query.GROUP_BY:
       return new HiveDruidSplit[] {new HiveDruidSplit(druidQuery, paths[0], new String[] {address})};
-    case Query.SELECT:
-      SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
-      return distributeSelectQuery(address, selectQuery, paths[0]);
     case Query.SCAN:
       ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, ScanQuery.class);
       return distributeScanQuery(address, scanQuery, paths[0]);
@@ -163,54 +154,13 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     }
   }
 
-  /* New method that distributes the Select query by creating splits containing
-   * information about different Druid nodes that have the data for the given
-   * query. */
-  private static HiveDruidSplit[] distributeSelectQuery(String address, SelectQuery query, Path dummyPath)
-      throws IOException {
-    // If it has a limit, we use it and we do not distribute the query
-    final boolean isFetch = query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false);
-    if (isFetch) {
-      return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
-          dummyPath,
-          new String[] {address})};
-    }
-
-    final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors(address, query);
-
-    // Create one input split for each segment
-    final int numSplits = segmentDescriptors.size();
-    final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()];
-    for (int i = 0; i < numSplits; i++) {
-      final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i);
-      final String[] hosts = new String[locatedSD.getLocations().size()];
-      for (int j = 0; j < locatedSD.getLocations().size(); j++) {
-        hosts[j] = locatedSD.getLocations().get(j).getHost();
-      }
-      // Create partial Select query
-      final SegmentDescriptor
-          newSD =
-          new SegmentDescriptor(locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber());
-      //@TODO This is fetching all the rows at once from broker or multiple historical nodes
-      // Move to use scan query to avoid GC back pressure on the nodes
-      // https://issues.apache.org/jira/browse/HIVE-17627
-      final SelectQuery
-          partialQuery =
-          query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)))
-              .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
-      splits[i] =
-          new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, hosts);
-    }
-    return splits;
-  }
-
   /* New method that distributes the Scan query by creating splits containing
    * information about different Druid nodes that have the data for the given
    * query. */
   private static HiveDruidSplit[] distributeScanQuery(String address, ScanQuery query, Path dummyPath)
       throws IOException {
     // If it has a limit, we use it and we do not distribute the query
-    final boolean isFetch = query.getLimit() < Long.MAX_VALUE;
+    final boolean isFetch = query.getScanRowsLimit() < Long.MAX_VALUE;
     if (isFetch) {
       return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
           dummyPath,
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 248b59a..dc16c4e 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -103,13 +103,9 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab
             "realtimeTuningConfig is null");
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
 
-    appenderator =
-        Appenderators.createOffline(this.dataSchema,
-            tuningConfig,
-            new FireDepartmentMetrics(),
-            dataSegmentPusher,
-            DruidStorageHandlerUtils.JSON_MAPPER,
-            DruidStorageHandlerUtils.INDEX_IO,
+    appenderator = Appenderators
+        .createOffline("hive-offline-appenderator", this.dataSchema, tuningConfig, false, new FireDepartmentMetrics(),
+            dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO,
             DruidStorageHandlerUtils.INDEX_MERGER_V9);
     this.maxPartitionSize = maxPartitionSize;
     appenderator.startJob();
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java
new file mode 100644
index 0000000..92800ce
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig {
+  @JsonCreator
+  public KafkaIndexTaskTuningConfig(
+      @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+      @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+      @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+      @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+      @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+      // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+      @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+      @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+      @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+  ) {
+    super(
+        maxRowsInMemory,
+        maxBytesInMemory,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        true,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        false,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+  }
+
+  @Override
+  public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) {
+    return new KafkaIndexTaskTuningConfig(
+        getMaxRowsInMemory(),
+        getMaxBytesInMemory(),
+        getMaxRowsPerSegment(),
+        getMaxTotalRows(),
+        getIntermediatePersistPeriod(),
+        dir,
+        getMaxPendingPersists(),
+        getIndexSpec(),
+        getIndexSpecForIntermediatePersists(),
+        true,
+        isReportParseExceptions(),
+        getHandoffConditionTimeout(),
+        isResetOffsetAutomatically(),
+        getSegmentWriteOutMediumFactory(),
+        getIntermediateHandoffPeriod(),
+        isLogParseExceptions(),
+        getMaxParseExceptions(),
+        getMaxSavedParseExceptions()
+    );
+  }
+
+
+  @Override
+  public String toString() {
+    return "KafkaIndexTaskTuningConfig{" +
+        "maxRowsInMemory=" + getMaxRowsInMemory() +
+        ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
+        ", maxTotalRows=" + getMaxTotalRows() +
+        ", maxBytesInMemory=" + getMaxBytesInMemory() +
+        ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
+        ", basePersistDirectory=" + getBasePersistDirectory() +
+        ", maxPendingPersists=" + getMaxPendingPersists() +
+        ", indexSpec=" + getIndexSpec() +
+        ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
+        ", reportParseExceptions=" + isReportParseExceptions() +
+        ", handoffConditionTimeout=" + getHandoffConditionTimeout() +
+        ", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
+        ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
+        ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
+        ", logParseExceptions=" + isLogParseExceptions() +
+        ", maxParseExceptions=" + getMaxParseExceptions() +
+        ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
+        '}';
+  }
+
+}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
index d230832..18177b6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
@@ -52,24 +52,8 @@ import java.util.Map;
                 null,
                 null,
                 null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null,
-                    null);
+                null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+                null, null);
     this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
     this.context = context;
   }
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
index b4d38b9..4e17161 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
@@ -34,9 +34,14 @@ import java.io.File;
  * This class is copied from druid source code
  * in order to avoid adding additional dependencies on druid-indexing-service.
  */
-@SuppressWarnings("ALL") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({
-    @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) })
-public class KafkaSupervisorTuningConfig extends KafkaTuningConfig {
+@SuppressWarnings("unused")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+    @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class)})
+public class KafkaSupervisorTuningConfig
+    extends KafkaIndexTaskTuningConfig implements SeekableStreamSupervisorTuningConfig {
+  private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S";
+
   private final Integer workerThreads;
   private final Integer chatThreads;
   private final Long chatRetries;
@@ -44,174 +49,113 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig {
   private final Duration shutdownTimeout;
   private final Duration offsetFetchPeriod;
 
-  public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+  public static KafkaSupervisorTuningConfig defaultConfig() {
+    return new KafkaSupervisorTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null,
+        null, null, null, null, null, null, null, null, null, null, null);
+  }
+
+  public KafkaSupervisorTuningConfig(
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
-      @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
-      @JsonProperty("maxTotalRows") Long maxTotalRows,
+      @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
       @JsonProperty("basePersistDirectory") File basePersistDirectory,
-      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
       // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
       @JsonProperty("buildV9Directly") Boolean buildV9Directly,
       @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
       @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
       @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
       @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("workerThreads") Integer workerThreads,
-      @JsonProperty("chatThreads") Integer chatThreads,
-      @JsonProperty("chatRetries") Long chatRetries,
-      @JsonProperty("httpTimeout") Period httpTimeout,
+      @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads,
+      @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout,
       @JsonProperty("shutdownTimeout") Period shutdownTimeout,
       @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
       @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
       @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) {
-    super(maxRowsInMemory,
-        maxBytesInMemory,
-        maxRowsPerSegment,
-        maxTotalRows,
-        intermediatePersistPeriod,
-        basePersistDirectory,
-        maxPendingPersists,
-        indexSpec,
-        true,
-        reportParseExceptions,
-        handoffConditionTimeout,
-        resetOffsetAutomatically,
-        segmentWriteOutMediumFactory,
-        intermediateHandoffPeriod,
-        logParseExceptions,
-        maxParseExceptions,
-        maxSavedParseExceptions);
-
+    super(maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, maxTotalRows, intermediatePersistPeriod,
+        basePersistDirectory, maxPendingPersists, indexSpec, indexSpecForIntermediatePersists, true,
+        reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, maxSavedParseExceptions);
     this.workerThreads = workerThreads;
     this.chatThreads = chatThreads;
-    this.chatRetries = (chatRetries != null ? chatRetries : 8);
-    this.httpTimeout = defaultDuration(httpTimeout, "PT10S");
-    this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S");
-    this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S");
+    this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
+    this.httpTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(httpTimeout, DEFAULT_HTTP_TIMEOUT);
+    this.shutdownTimeout =
+        SeekableStreamSupervisorTuningConfig.defaultDuration(shutdownTimeout, DEFAULT_SHUTDOWN_TIMEOUT);
+    this.offsetFetchPeriod =
+        SeekableStreamSupervisorTuningConfig.defaultDuration(offsetFetchPeriod, DEFAULT_OFFSET_FETCH_PERIOD);
   }
 
-  @JsonProperty public Integer getWorkerThreads() {
+  @Override
+  @JsonProperty
+  public Integer getWorkerThreads() {
     return workerThreads;
   }
 
-  @JsonProperty public Integer getChatThreads() {
+  @Override
+  @JsonProperty
+  public Integer getChatThreads() {
     return chatThreads;
   }
 
-  @JsonProperty public Long getChatRetries() {
+  @Override
+  @JsonProperty
+  public Long getChatRetries() {
     return chatRetries;
   }
 
-  @JsonProperty public Duration getHttpTimeout() {
+  @Override
+  @JsonProperty
+  public Duration getHttpTimeout() {
     return httpTimeout;
   }
 
-  @JsonProperty public Duration getShutdownTimeout() {
+  @Override
+  @JsonProperty
+  public Duration getShutdownTimeout() {
     return shutdownTimeout;
   }
 
-  @JsonProperty public Duration getOffsetFetchPeriod() {
-    return offsetFetchPeriod;
-  }
-
-  @Override public String toString() {
-    return "KafkaSupervisorTuningConfig{"
-        + "maxRowsInMemory="
-        + getMaxRowsInMemory()
-        + ", maxRowsPerSegment="
-        + getMaxRowsPerSegment()
-        + ", maxTotalRows="
-        + getMaxTotalRows()
-        + ", maxBytesInMemory="
-        + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory())
-        + ", intermediatePersistPeriod="
-        + getIntermediatePersistPeriod()
-        + ", basePersistDirectory="
-        + getBasePersistDirectory()
-        + ", maxPendingPersists="
-        + getMaxPendingPersists()
-        + ", indexSpec="
-        + getIndexSpec()
-        + ", reportParseExceptions="
-        + isReportParseExceptions()
-        + ", handoffConditionTimeout="
-        + getHandoffConditionTimeout()
-        + ", resetOffsetAutomatically="
-        + isResetOffsetAutomatically()
-        + ", segmentWriteOutMediumFactory="
-        + getSegmentWriteOutMediumFactory()
-        + ", workerThreads="
-        + workerThreads
-        + ", chatThreads="
-        + chatThreads
-        + ", chatRetries="
-        + chatRetries
-        + ", httpTimeout="
-        + httpTimeout
-        + ", shutdownTimeout="
-        + shutdownTimeout
-        + ", offsetFetchPeriod="
-        + offsetFetchPeriod
-        + ", intermediateHandoffPeriod="
-        + getIntermediateHandoffPeriod()
-        + ", logParseExceptions="
-        + isLogParseExceptions()
-        + ", maxParseExceptions="
-        + getMaxParseExceptions()
-        + ", maxSavedParseExceptions="
-        + getMaxSavedParseExceptions()
-        + '}';
+  @Override
+  public Duration getRepartitionTransitionDuration() {
+    // Stopping tasks early for Kafka ingestion on partition set change is not supported yet,
+    // just return a default for now.
+    return SeekableStreamSupervisorTuningConfig
+        .defaultDuration(null, SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION);
   }
 
-  private static Duration defaultDuration(final Period period, final String theDefault) {
-    return (period == null ? new Period(theDefault) : period).toStandardDuration();
+  @JsonProperty
+  public Duration getOffsetFetchPeriod() {
+    return offsetFetchPeriod;
   }
 
-  @Override public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-
-    KafkaSupervisorTuningConfig that = (KafkaSupervisorTuningConfig) o;
-
-    if (workerThreads != null ? !workerThreads.equals(that.workerThreads) : that.workerThreads != null) {
-      return false;
-    }
-    if (chatThreads != null ? !chatThreads.equals(that.chatThreads) : that.chatThreads != null) {
-      return false;
-    }
-    if (chatRetries != null ? !chatRetries.equals(that.chatRetries) : that.chatRetries != null) {
-      return false;
-    }
-    if (httpTimeout != null ? !httpTimeout.equals(that.httpTimeout) : that.httpTimeout != null) {
-      return false;
-    }
-    if (shutdownTimeout != null ? !shutdownTimeout.equals(that.shutdownTimeout) : that.shutdownTimeout != null) {
-      return false;
-    }
-    return offsetFetchPeriod != null ?
-        offsetFetchPeriod.equals(that.offsetFetchPeriod) :
-        that.offsetFetchPeriod == null;
+  @Override
+  public String toString() {
+    return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment="
+        + getMaxRowsPerSegment() + ", maxTotalRows=" + getMaxTotalRows() + ", maxBytesInMemory=" + TuningConfigs
+        .getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod="
+        + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory()
+        + ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec()
+        + ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout="
+        + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically()
+        + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + ", workerThreads=" + workerThreads
+        + ", chatThreads=" + chatThreads + ", chatRetries=" + chatRetries + ", httpTimeout=" + httpTimeout
+        + ", shutdownTimeout=" + shutdownTimeout + ", offsetFetchPeriod=" + offsetFetchPeriod
+        + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", logParseExceptions="
+        + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions="
+        + getMaxSavedParseExceptions() + '}';
   }
 
-  @Override public int hashCode() {
-    int result = super.hashCode();
-    result = 31 * result + (workerThreads != null ? workerThreads.hashCode() : 0);
-    result = 31 * result + (chatThreads != null ? chatThreads.hashCode() : 0);
-    result = 31 * result + (chatRetries != null ? chatRetries.hashCode() : 0);
-    result = 31 * result + (httpTimeout != null ? httpTimeout.hashCode() : 0);
-    result = 31 * result + (shutdownTimeout != null ? shutdownTimeout.hashCode() : 0);
-    result = 31 * result + (offsetFetchPeriod != null ? offsetFetchPeriod.hashCode() : 0);
-    return result;
+  @Override
+  public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() {
+    return new KafkaIndexTaskTuningConfig(getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(),
+        getMaxTotalRows(), getIntermediatePersistPeriod(), getBasePersistDirectory(), getMaxPendingPersists(),
+        getIndexSpec(), getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(),
+        getHandoffConditionTimeout(), isResetOffsetAutomatically(), getSegmentWriteOutMediumFactory(),
+        getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions());
   }
 }
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
deleted file mode 100644
index 45ac77b..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.druid.json;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.joda.time.Period;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.Objects;
-
-/**
- * This class is copied from druid source code
- * in order to avoid adding additional dependencies on druid-indexing-service.
- */
-public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig {
-  private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
-  private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
-
-  private final int maxRowsInMemory;
-  private final long maxBytesInMemory;
-  private final int maxRowsPerSegment;
-  @Nullable private final Long maxTotalRows;
-  private final Period intermediatePersistPeriod;
-  private final File basePersistDirectory;
-  @Deprecated private final int maxPendingPersists;
-  private final IndexSpec indexSpec;
-  private final boolean reportParseExceptions;
-  @Deprecated private final long handoffConditionTimeout;
-  private final boolean resetOffsetAutomatically;
-  @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
-  private final Period intermediateHandoffPeriod;
-
-  private final boolean logParseExceptions;
-  private final int maxParseExceptions;
-  private final int maxSavedParseExceptions;
-
-  @JsonCreator public KafkaTuningConfig(@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
-      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
-      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
-      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
-      @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
-      @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
-      @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
-      // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
-      @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
-      @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
-      @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
-      @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
-      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
-      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
-      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) {
-    // Cannot be a static because default basePersistDirectory is unique per-instance
-    final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
-
-    this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
-    this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
-    // initializing this to 0, it will be lazily initialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
-    this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
-    this.maxTotalRows = maxTotalRows;
-    this.intermediatePersistPeriod =
-        intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod;
-    this.basePersistDirectory = defaults.getBasePersistDirectory();
-    this.maxPendingPersists = 0;
-    this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
-    this.reportParseExceptions =
-        reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions;
-    this.handoffConditionTimeout =
-        handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout;
-    this.resetOffsetAutomatically =
-        resetOffsetAutomatically == null ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically;
-    this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
-    this.intermediateHandoffPeriod =
-        intermediateHandoffPeriod == null ? new Period().withDays(Integer.MAX_VALUE) : intermediateHandoffPeriod;
-
-    if (this.reportParseExceptions) {
-      this.maxParseExceptions = 0;
-      this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions);
-    } else {
-      this.maxParseExceptions =
-          maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions;
-      this.maxSavedParseExceptions =
-          maxSavedParseExceptions == null ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS : maxSavedParseExceptions;
-    }
-    this.logParseExceptions =
-        logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
-  }
-
-  public static KafkaTuningConfig copyOf(KafkaTuningConfig config) {
-    return new KafkaTuningConfig(config.maxRowsInMemory,
-        config.maxBytesInMemory,
-        config.maxRowsPerSegment,
-        config.maxTotalRows,
-        config.intermediatePersistPeriod,
-        config.basePersistDirectory,
-        config.maxPendingPersists,
-        config.indexSpec,
-        true,
-        config.reportParseExceptions,
-        config.handoffConditionTimeout,
-        config.resetOffsetAutomatically,
-        config.segmentWriteOutMediumFactory,
-        config.intermediateHandoffPeriod,
-        config.logParseExceptions,
-        config.maxParseExceptions,
-        config.maxSavedParseExceptions);
-  }
-
-  @Override @JsonProperty public int getMaxRowsInMemory() {
-    return maxRowsInMemory;
-  }
-
-  @Override @JsonProperty public long getMaxBytesInMemory() {
-    return maxBytesInMemory;
-  }
-
-  @Override @JsonProperty public Integer getMaxRowsPerSegment() {
-    return maxRowsPerSegment;
-  }
-
-  @JsonProperty @Override @Nullable public Long getMaxTotalRows() {
-    return maxTotalRows;
-  }
-
-  @Override @JsonProperty public Period getIntermediatePersistPeriod() {
-    return intermediatePersistPeriod;
-  }
-
-  @Override @JsonProperty public File getBasePersistDirectory() {
-    return basePersistDirectory;
-  }
-
-  @Override @JsonProperty @Deprecated public int getMaxPendingPersists() {
-    return maxPendingPersists;
-  }
-
-  @Override @JsonProperty public IndexSpec getIndexSpec() {
-    return indexSpec;
-  }
-
-  /**
-   * Always returns true, doesn't affect the version being built.
-   */
-  @SuppressWarnings("SameReturnValue") @Deprecated @JsonProperty public boolean getBuildV9Directly() {
-    return true;
-  }
-
-  @Override @JsonProperty public boolean isReportParseExceptions() {
-    return reportParseExceptions;
-  }
-
-  @JsonProperty public long getHandoffConditionTimeout() {
-    return handoffConditionTimeout;
-  }
-
-  @JsonProperty public boolean isResetOffsetAutomatically() {
-    return resetOffsetAutomatically;
-  }
-
-  @Override @JsonProperty @Nullable public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
-    return segmentWriteOutMediumFactory;
-  }
-
-  @JsonProperty public Period getIntermediateHandoffPeriod() {
-    return intermediateHandoffPeriod;
-  }
-
-  @JsonProperty public boolean isLogParseExceptions() {
-    return logParseExceptions;
-  }
-
-  @JsonProperty public int getMaxParseExceptions() {
-    return maxParseExceptions;
-  }
-
-  @JsonProperty public int getMaxSavedParseExceptions() {
-    return maxSavedParseExceptions;
-  }
-
-  public KafkaTuningConfig withBasePersistDirectory(File dir) {
-    return new KafkaTuningConfig(maxRowsInMemory,
-        maxBytesInMemory,
-        maxRowsPerSegment,
-        maxTotalRows,
-        intermediatePersistPeriod,
-        dir,
-        maxPendingPersists,
-        indexSpec,
-        true,
-        reportParseExceptions,
-        handoffConditionTimeout,
-        resetOffsetAutomatically,
-        segmentWriteOutMediumFactory,
-        intermediateHandoffPeriod,
-        logParseExceptions,
-        maxParseExceptions,
-        maxSavedParseExceptions);
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    KafkaTuningConfig that = (KafkaTuningConfig) o;
-    return maxRowsInMemory == that.maxRowsInMemory
-        && maxRowsPerSegment == that.maxRowsPerSegment
-        && maxBytesInMemory == that.maxBytesInMemory
-        && Objects.equals(maxTotalRows, that.maxTotalRows)
-        && maxPendingPersists == that.maxPendingPersists
-        && reportParseExceptions == that.reportParseExceptions
-        && handoffConditionTimeout == that.handoffConditionTimeout
-        && resetOffsetAutomatically == that.resetOffsetAutomatically
-        && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod)
-        && Objects.equals(basePersistDirectory, that.basePersistDirectory)
-        && Objects.equals(indexSpec, that.indexSpec)
-        && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory)
-        && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod)
-        && logParseExceptions == that.logParseExceptions
-        && maxParseExceptions == that.maxParseExceptions
-        && maxSavedParseExceptions == that.maxSavedParseExceptions;
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(maxRowsInMemory,
-        maxRowsPerSegment,
-        maxBytesInMemory,
-        maxTotalRows,
-        intermediatePersistPeriod,
-        basePersistDirectory,
-        maxPendingPersists,
-        indexSpec,
-        reportParseExceptions,
-        handoffConditionTimeout,
-        resetOffsetAutomatically,
-        segmentWriteOutMediumFactory,
-        intermediateHandoffPeriod,
-        logParseExceptions,
-        maxParseExceptions,
-        maxSavedParseExceptions);
-  }
-
-  @Override public String toString() {
-    return "KafkaTuningConfig{"
-        + "maxRowsInMemory="
-        + maxRowsInMemory
-        + ", maxRowsPerSegment="
-        + maxRowsPerSegment
-        + ", maxTotalRows="
-        + maxTotalRows
-        + ", maxBytesInMemory="
-        + maxBytesInMemory
-        + ", intermediatePersistPeriod="
-        + intermediatePersistPeriod
-        + ", basePersistDirectory="
-        + basePersistDirectory
-        + ", maxPendingPersists="
-        + maxPendingPersists
-        + ", indexSpec="
-        + indexSpec
-        + ", reportParseExceptions="
-        + reportParseExceptions
-        + ", handoffConditionTimeout="
-        + handoffConditionTimeout
-        + ", resetOffsetAutomatically="
-        + resetOffsetAutomatically
-        + ", segmentWriteOutMediumFactory="
-        + segmentWriteOutMediumFactory
-        + ", intermediateHandoffPeriod="
-        + intermediateHandoffPeriod
-        + ", logParseExceptions="
-        + logParseExceptions
-        + ", maxParseExceptions="
-        + maxParseExceptions
-        + ", maxSavedParseExceptions="
-        + maxSavedParseExceptions
-        + '}';
-  }
-}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java
new file mode 100644
index 0000000..289f0e8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.indexing.RealtimeTuningConfig;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig {
+  private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
+  private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false;
+
+  private final int maxRowsInMemory;
+  private final long maxBytesInMemory;
+  private final DynamicPartitionsSpec partitionsSpec;
+  private final Period intermediatePersistPeriod;
+  private final File basePersistDirectory;
+  @Deprecated
+  private final int maxPendingPersists;
+  private final IndexSpec indexSpec;
+  private final IndexSpec indexSpecForIntermediatePersists;
+  private final boolean reportParseExceptions;
+  private final long handoffConditionTimeout;
+  private final boolean resetOffsetAutomatically;
+  @Nullable
+  private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
+  private final Period intermediateHandoffPeriod;
+  private final boolean skipSequenceNumberAvailabilityCheck;
+
+  private final boolean logParseExceptions;
+  private final int maxParseExceptions;
+  private final int maxSavedParseExceptions;
+
+  public SeekableStreamIndexTaskTuningConfig(
+      @Nullable Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      @Nullable Period intermediatePersistPeriod,
+      @Nullable File basePersistDirectory,
+      @Nullable Integer maxPendingPersists,
+      @Nullable IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+      @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+      @Deprecated @Nullable Boolean reportParseExceptions,
+      @Nullable Long handoffConditionTimeout,
+      @Nullable Boolean resetOffsetAutomatically,
+      Boolean skipSequenceNumberAvailabilityCheck,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Period intermediateHandoffPeriod,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions
+  ) {
+    // Cannot be a static because default basePersistDirectory is unique per-instance
+    final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
+
+    this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
+    this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
+    // initializing this to 0, it will be lazily initialized to a value
+    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
+    this.intermediatePersistPeriod = intermediatePersistPeriod == null
+        ? defaults.getIntermediatePersistPeriod()
+        : intermediatePersistPeriod;
+    this.basePersistDirectory = defaults.getBasePersistDirectory();
+    this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists;
+    this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
+    this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
+        this.indexSpec : indexSpecForIntermediatePersists;
+    this.reportParseExceptions = reportParseExceptions == null
+        ? defaults.isReportParseExceptions()
+        : reportParseExceptions;
+    this.handoffConditionTimeout = handoffConditionTimeout == null
+        ? defaults.getHandoffConditionTimeout()
+        : handoffConditionTimeout;
+    this.resetOffsetAutomatically = resetOffsetAutomatically == null
+        ? DEFAULT_RESET_OFFSET_AUTOMATICALLY
+        : resetOffsetAutomatically;
+    this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
+    this.intermediateHandoffPeriod = intermediateHandoffPeriod == null
+        ? new Period().withDays(Integer.MAX_VALUE)
+        : intermediateHandoffPeriod;
+    this.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck == null
+        ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK
+        : skipSequenceNumberAvailabilityCheck;
+
+    if (this.reportParseExceptions) {
+      this.maxParseExceptions = 0;
+      this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions);
+    } else {
+      this.maxParseExceptions = maxParseExceptions == null
+          ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS
+          : maxParseExceptions;
+      this.maxSavedParseExceptions = maxSavedParseExceptions == null
+          ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS
+          : maxSavedParseExceptions;
+    }
+    this.logParseExceptions = logParseExceptions == null
+        ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
+        : logParseExceptions;
+  }
+
+  @Override
+  @JsonProperty
+  public int getMaxRowsInMemory() {
+    return maxRowsInMemory;
+  }
+
+  @Override
+  @JsonProperty
+  public long getMaxBytesInMemory() {
+    return maxBytesInMemory;
+  }
+
+  @Override
+  @JsonProperty
+  public Integer getMaxRowsPerSegment() {
+    return partitionsSpec.getMaxRowsPerSegment();
+  }
+
+  @JsonProperty
+  @Override
+  @Nullable
+  public Long getMaxTotalRows() {
+    return partitionsSpec.getMaxTotalRows();
+  }
+
+  @Override
+  public DynamicPartitionsSpec getPartitionsSpec() {
+    return partitionsSpec;
+  }
+
+  @Override
+  @JsonProperty
+  public Period getIntermediatePersistPeriod() {
+    return intermediatePersistPeriod;
+  }
+
+  @Override
+  @JsonProperty
+  public File getBasePersistDirectory() {
+    return basePersistDirectory;
+  }
+
+  @Override
+  @JsonProperty
+  @Deprecated
+  public int getMaxPendingPersists() {
+    return maxPendingPersists;
+  }
+
+  @Override
+  @JsonProperty
+  public IndexSpec getIndexSpec() {
+    return indexSpec;
+  }
+
+  @JsonProperty
+  @Override
+  public IndexSpec getIndexSpecForIntermediatePersists() {
+    return indexSpecForIntermediatePersists;
+  }
+
+  /**
+   * Always returns true, doesn't affect the version being built.
+   */
+  @Deprecated
+  @JsonProperty
+  public boolean getBuildV9Directly() {
+    return true;
+  }
+
+  @Override
+  @JsonProperty
+  public boolean isReportParseExceptions() {
+    return reportParseExceptions;
+  }
+
+  @JsonProperty
+  public long getHandoffConditionTimeout() {
+    return handoffConditionTimeout;
+  }
+
+  @JsonProperty
+  public boolean isResetOffsetAutomatically() {
+    return resetOffsetAutomatically;
+  }
+
+  @Override
+  @JsonProperty
+  @Nullable
+  public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
+    return segmentWriteOutMediumFactory;
+  }
+
+  @JsonProperty
+  public Period getIntermediateHandoffPeriod() {
+    return intermediateHandoffPeriod;
+  }
+
+  @JsonProperty
+  public boolean isLogParseExceptions() {
+    return logParseExceptions;
+  }
+
+  @JsonProperty
+  public int getMaxParseExceptions() {
+    return maxParseExceptions;
+  }
+
+  @JsonProperty
+  public int getMaxSavedParseExceptions() {
+    return maxSavedParseExceptions;
+  }
+
+  @JsonProperty
+  public boolean isSkipSequenceNumberAvailabilityCheck() {
+    return skipSequenceNumberAvailabilityCheck;
+  }
+
+  @Override
+  public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir);
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
+    return maxRowsInMemory == that.maxRowsInMemory &&
+        maxBytesInMemory == that.maxBytesInMemory &&
+        maxPendingPersists == that.maxPendingPersists &&
+        reportParseExceptions == that.reportParseExceptions &&
+        handoffConditionTimeout == that.handoffConditionTimeout &&
+        resetOffsetAutomatically == that.resetOffsetAutomatically &&
+        skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck &&
+        logParseExceptions == that.logParseExceptions &&
+        maxParseExceptions == that.maxParseExceptions &&
+        maxSavedParseExceptions == that.maxSavedParseExceptions &&
+        Objects.equals(partitionsSpec, that.partitionsSpec) &&
+        Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) &&
+        Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
+        Objects.equals(indexSpec, that.indexSpec) &&
+        Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
+        Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
+        Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        maxRowsInMemory,
+        maxBytesInMemory,
+        partitionsSpec,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        skipSequenceNumberAvailabilityCheck,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+  }
+
+  @Override
+  public abstract String toString();
+}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java
new file mode 100644
index 0000000..ec71518
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public interface SeekableStreamSupervisorTuningConfig {
+
+  int DEFAULT_CHAT_RETRIES = 8;
+  String DEFAULT_HTTP_TIMEOUT = "PT10S";
+  String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S";
+  String DEFAULT_REPARTITION_TRANSITION_DURATION = "PT2M";
+
+  static Duration defaultDuration(final Period period, final String theDefault) {
+    return (period == null ? new Period(theDefault) : period).toStandardDuration();
+  }
+
+  @JsonProperty
+  Integer getWorkerThreads();
+
+  @JsonProperty
+  Integer getChatThreads();
+
+  @JsonProperty
+  Long getChatRetries();
+
+  @JsonProperty
+  Duration getHttpTimeout();
+
+  @JsonProperty
+  Duration getShutdownTimeout();
+
+  @JsonProperty
+  Duration getRepartitionTransitionDuration();
+
+  SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig();
+}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 4142e48..19379e1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -128,7 +128,8 @@ public abstract class DruidQueryRecordReader<R extends Comparable<R>> extends Re
           // We got exception while querying results from this host.
           CloseQuietly.close(iterator);
         }
-        LOG.error("Failure getting results for query[{}] from host[{}] because of [{}]", query, address, e.getMessage());
+        LOG.error("Failure getting results for query[{}] from host[{}] because of [{}]",
+            query, address, e.getMessage());
         if (ex == null) {
           ex = e;
         } else {
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
deleted file mode 100644
index 907558f..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid.serde;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-import com.fasterxml.jackson.databind.JavaType;
-import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.conf.DruidConstants;
-import org.apache.hadoop.io.NullWritable;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-
-import org.apache.druid.query.Result;
-import org.apache.druid.query.select.EventHolder;
-import org.apache.druid.query.select.SelectResultValue;
-
-/**
- * Record reader for results for Druid SelectQuery.
- */
-public class DruidSelectQueryRecordReader extends DruidQueryRecordReader<Result<SelectResultValue>> {
-
-  private static final TypeReference<Result<SelectResultValue>>
-      TYPE_REFERENCE =
-      new TypeReference<Result<SelectResultValue>>() {
-      };
-
-  private Iterator<EventHolder> values = Collections.emptyIterator();
-
-  @Override protected JavaType getResultTypeDef() {
-    return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
-  }
-
-  @Override public boolean nextKeyValue() throws IOException {
-    if (values.hasNext()) {
-      return true;
-    }
-    if (getQueryResultsIterator().hasNext()) {
-      Result<SelectResultValue> current = getQueryResultsIterator().next();
-      values = current.getValue().getEvents().iterator();
-      return nextKeyValue();
-    }
-    return false;
-  }
-
-  @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
-    return NullWritable.get();
-  }
-
-  @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException {
-    // Create new value
-    DruidWritable value = new DruidWritable(false);
-    EventHolder e = values.next();
-    value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis());
-    value.getValue().putAll(e.getEvent());
-    return value;
-  }
-
-  @Override public boolean next(NullWritable key, DruidWritable value) throws IOException {
-    if (nextKeyValue()) {
-      // Update value
-      value.getValue().clear();
-      EventHolder e = values.next();
-      value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis());
-      value.getValue().putAll(e.getEvent());
-      return true;
-    }
-    return false;
-  }
-
-  @Override public float getProgress() {
-    return getQueryResultsIterator().hasNext() || values.hasNext() ? 0 : 1;
-  }
-
-}
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 0b2072c..7d94f1a 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -93,12 +93,8 @@ public class TestDruidStorageHandler {
   private DataSegment createSegment(String location, Interval interval, String version, ShardSpec shardSpec)
       throws IOException {
     FileUtils.writeStringToFile(new File(location), "dummySegmentData");
-    return DataSegment.builder()
-        .dataSource(DATA_SOURCE_NAME)
-        .version(version)
-        .interval(interval)
-        .shardSpec(shardSpec)
-        .loadSpec(ImmutableMap.of("path", location))
+    return DataSegment.builder().dataSource(DATA_SOURCE_NAME).version(version).interval(interval).shardSpec(shardSpec)
+        .loadSpec(ImmutableMap.of("path", location)).size(1000L)
         .build();
   }
 
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index 58f4a44..2bcbb14 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -141,47 +141,6 @@ import org.junit.Test;
           + "\"context\":{\"queryId\":\"\"},"
           + "\"descending\":false}, [localhost:8082]}]";
 
-  private static final String
-      SELECT_QUERY =
-      "{   \"queryType\": \"select\",  "
-          + " \"dataSource\": \"wikipedia\",   \"descending\": \"false\",  "
-          + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\","
-          + "\"newpage\",\"user\"],  "
-          + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],  "
-          + " \"granularity\": \"all\",  "
-          + " \"intervals\": [     \"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\"   ],  "
-          + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, "
-          + " \"context\":{\"druid.query.fetch\":true}}";
-  private static final String
-      SELECT_QUERY_SPLIT =
-      "[HiveDruidSplit{{\"queryType\":\"select\","
-          + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"},"
-          + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T08:00:00"
-          + ".000Z/2013-01-02T08:00:00.000Z\"]},"
-          + "\"descending\":false,"
-          + "\"filter\":null,"
-          + "\"granularity\":{\"type\":\"all\"},"
-          + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"robot\",\"outputName\":\"robot\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"namespace\",\"outputName\":\"namespace\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"anonymous\",\"outputName\":\"anonymous\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"unpatrolled\",\"outputName\":\"unpatrolled\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"page\",\"outputName\":\"page\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"language\",\"outputName\":\"language\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"newpage\",\"outputName\":\"newpage\","
-          + "\"outputType\":\"STRING\"},"
-          + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\","
-          + "\"outputType\":\"STRING\"}],"
-          + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],"
-          + "\"virtualColumns\":[],"
-          + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false},"
-          + "\"context\":{\"druid.query.fetch\":true,\"queryId\":\"\"}}, [localhost:8082]}]";
-
   @Test
   public void testTimeZone() throws Exception {
     DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat();
@@ -202,9 +161,6 @@ import org.junit.Test;
     resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf);
     assertEquals(GROUP_BY_QUERY_SPLIT, Arrays.toString(resultSplits));
 
-    conf = createPropertiesQuery("sample_datasource", Query.SELECT, SELECT_QUERY);
-    resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf);
-    assertEquals(SELECT_QUERY_SPLIT, Arrays.toString(resultSplits));
   }
 
   private static Configuration createPropertiesQuery(String dataSource, String queryType, String jsonQuery) {
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
index edfcc65..b2a871e 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
@@ -83,7 +83,6 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.Result;
-import org.apache.druid.query.select.SelectResultValue;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.apache.druid.query.topn.TopNResultValue;
 import org.junit.rules.ExpectedException;
@@ -92,7 +91,8 @@ import org.junit.rules.ExpectedException;
  * Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1
  * documentation.
  */
-@SuppressWarnings({ "SameParameterValue", "SpellCheckingInspection" }) public class TestDruidSerDe {
+@SuppressWarnings({"SameParameterValue", "SpellCheckingInspection"})
+public class TestDruidSerDe {
   // Timeseries query
   private static final String
       TIMESERIES_QUERY =
@@ -156,12 +156,12 @@ import org.junit.rules.ExpectedException;
           new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC)),
           0L,
           1.0F,
-          2.2222F },
+          2.2222F},
       new Object[]{
           new TimestampTZ(Instant.ofEpochMilli(1325462400000L).atZone(ZoneOffset.UTC)),
           2L,
           3.32F,
-          4F } };
+          4F}};
 
   // Timeseries query results as records (types defined by metastore)
   private static final String TIMESERIES_COLUMN_NAMES = "timestamp,sample_name1,sample_name2,sample_divide";
@@ -269,19 +269,19 @@ import org.junit.rules.ExpectedException;
   private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][]{
       new Object[]{
           new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)),
-          "dim1_val", 111L, 10669F, 96.11711711711712F },
+          "dim1_val", 111L, 10669F, 96.11711711711712F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
-          "another_dim1_val", 88L, 28344F, 322.09090909090907F },
+          "another_dim1_val", 88L, 28344F, 322.09090909090907F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
-          "dim1_val3", 70L, 871F, 12.442857142857143F },
+          "dim1_val3", 70L, 871F, 12.442857142857143F},
       new Object[]{
           new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)),
-          "dim1_val4", 62L, 815F, 13.14516129032258F },
+          "dim1_val4", 62L, 815F, 13.14516129032258F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
-          "dim1_val5", 60L, 2787F, 46.45F } };
+          "dim1_val5", 60L, 2787F, 46.45F}};
 
   // TopN query results as records (types defined by metastore)
   private static final String TOPN_COLUMN_NAMES = "timestamp,sample_dim,count,some_metric,sample_divide";
@@ -422,24 +422,24 @@ import org.junit.rules.ExpectedException;
   private static final Object[][] GROUP_BY_QUERY_EXTRACTION_RESULTS_RECORDS = new Object[][]{
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
-          (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 200L },
+          (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 200L},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
-          (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 400L } };
+          (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 400L}};
 
   private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][]{
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), "India",
-          "phone", 88L, 29.91233453, 60.32F },
+          "phone", 88L, 29.91233453, 60.32F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
-          "Spain", "pc", 16L, 172.93494959, 6.333333F } };
+          "Spain", "pc", 16L, 172.93494959, 6.333333F}};
 
   private static final Object[][] GB_MONTH_EXTRACTION_RESULTS_RECORDS = new Object[][]{
       new Object[]{
-          (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 1, 200L },
+          (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 1, 200L},
       new Object[]{
-          (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 1, 400L } };
+          (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 1, 400L}};
 
   // GroupBy query results as records (types defined by metastore)
   private static final String GROUP_BY_COLUMN_NAMES = "timestamp,country,device,total_usage,data_transfer,avg_usage";
@@ -455,146 +455,14 @@ import org.junit.rules.ExpectedException;
   private static final String GB_MONTH_EXTRACTIONS_COLUMN_NAMES = "timestamp,extract_month,$f1";
   private static final String GB_MONTH_EXTRACTIONS_COLUMN_TYPES = "timestamp with local time zone,int,bigint";
 
-  // Select query
-  private static final String
-      SELECT_QUERY =
-      "{   \"queryType\": \"select\",  "
-          + " \"dataSource\": \"wikipedia\",   \"descending\": \"false\",  "
-          + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\","
-          + "\"newpage\",\"user\"],  "
-          + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],  "
-          + " \"granularity\": \"all\",  "
-          + " \"intervals\": [     \"2013-01-01/2013-01-02\"   ],  "
-          + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
-
-  // Select query results
-  private static final String
-      SELECT_QUERY_RESULTS =
-      "[{ "
-          + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
-          + " \"result\" : {  "
-          + "  \"pagingIdentifiers\" : {   "
-          + "   \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4    }, "
-          + "   \"events\" : [ {  "
-          + "    \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00"
-          + ".000Z_2013-01-10T08:13:47.830Z_v9\",  "
-          + "    \"offset\" : 0,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
-          + "     \"robot\" : 1,   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"11._korpus_(NOVJ)\",   "
-          + "     \"language\" : \"sl\",   "
-          + "     \"newpage\" : \"0\",   "
-          + "     \"user\" : \"EmausBot\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 39.0,   "
-          + "     \"delta\" : 39.0,   "
-          + "     \"variation\" : 39.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
-          + ".830Z_v9\",  "
-          + "    \"offset\" : 1,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:00.000Z\",   "
-          + "     \"robot\" : 0,   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"112_U.S._580\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 70.0,   "
-          + "     \"delta\" : 70.0,   "
-          + "     \"variation\" : 70.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
-          + ".830Z_v9\",  "
-          + "    \"offset\" : 2,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
-          + "     \"robot\" : 0,   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"113_U.S._243\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 77.0,   "
-          + "     \"delta\" : 77.0,   "
-          + "     \"variation\" : 77.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
-          + ".830Z_v9\",  "
-          + "    \"offset\" : 3,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
-          + "     \"robot\" : 0,   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"113_U.S._73\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 70.0,   "
-          + "     \"delta\" : 70.0,   "
-          + "     \"variation\" : 70.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   }, {  "
-          + "    \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
-          + ".830Z_v9\",  "
-          + "    \"offset\" : 4,  "
-          + "    \"event\" : {   "
-          + "     \"timestamp\" : \"2013-01-01T00:00:12.000Z\",   "
-          + "     \"robot\" : 0,   "
-          + "     \"namespace\" : \"article\",   "
-          + "     \"anonymous\" : \"0\",   "
-          + "     \"unpatrolled\" : \"0\",   "
-          + "     \"page\" : \"113_U.S._756\",   "
-          + "     \"language\" : \"en\",   "
-          + "     \"newpage\" : \"1\",   "
-          + "     \"user\" : \"MZMcBride\",   "
-          + "     \"count\" : 1.0,   "
-          + "     \"added\" : 68.0,   "
-          + "     \"delta\" : 68.0,   "
-          + "     \"variation\" : 68.0,   "
-          + "     \"deleted\" : 0.0  "
-          + "    } "
-          + "   } ]  }} ]";
-
-  // Select query results as records (types defined by metastore)
-  private static final String
-      SELECT_COLUMN_NAMES =
+  private static final String SCAN_COLUMN_NAMES =
       "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted";
-  private static final String
-      SELECT_COLUMN_TYPES =
+  private static final String SCAN_COLUMN_TYPES =
       "timestamp with local time zone,boolean,string,string,string,string,string,string,string,double,double,float,"
           + "float,float";
-  private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][]{
-      new Object[]{
-          (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE,
-          "article",
-          "0",
-          "0",
-          "11._korpus_(NOVJ)",
-          "sl",
-          "0",
-          "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F },
+  private static final Object[][] SCAN_QUERY_RESULTS_RECORDS = new Object[][]{
+      new Object[]{(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE,
+          "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
           "article",
@@ -603,7 +471,7 @@ import org.junit.rules.ExpectedException;
           "112_U.S._580",
           "en",
           "1",
-          "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F },
+          "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
           "article",
@@ -612,7 +480,7 @@ import org.junit.rules.ExpectedException;
           "113_U.S._243",
           "en",
           "1",
-          "MZMcBride", 1.0d, 77.0d, 77.0F, 77.0F, 0.0F },
+          "MZMcBride", 1.0d, 77.0d, 77.0F, 77.0F, 0.0F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
           "article",
@@ -621,7 +489,7 @@ import org.junit.rules.ExpectedException;
           "113_U.S._73",
           "en",
           "1",
-          "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F },
+          "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F},
       new Object[]{
           (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
           "article",
@@ -630,7 +498,7 @@ import org.junit.rules.ExpectedException;
           "113_U.S._756",
           "en",
           "1",
-          "MZMcBride", 1.0d, 68.0d, 68.0F, 68.0F, 0.0F } };
+          "MZMcBride", 1.0d, 68.0d, 68.0F, 68.0F, 0.0F}};
 
   // Scan query
   private static final String
@@ -664,7 +532,8 @@ import org.junit.rules.ExpectedException;
           + ".0,68.0,68.0,68.0,0.0]"
           + "]}]";
 
-  @Before public void setup() throws IOException {
+  @Before
+  public void setup() throws IOException {
     tsQueryResults =
         DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(
             TIMESERIES_QUERY_RESULTS,
@@ -691,12 +560,6 @@ import org.junit.rules.ExpectedException;
             GB_MONTH_EXTRACTIONS_RESULTS,
             new TypeReference<List<Row>>() {
             }));
-    selectQueryResults =
-        DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(
-            SELECT_QUERY_RESULTS,
-            new TypeReference<List<Result<SelectResultValue>>>() {
-            }));
-
     scanQueryResults =
         DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(
             SCAN_QUERY_RESULTS,
@@ -704,7 +567,8 @@ import org.junit.rules.ExpectedException;
             }));
   }
 
-  @Test public void testDruidDeserializer()
+  @Test
+  public void testDruidDeserializer()
       throws SerDeException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException,
       IOException, InterruptedException, NoSuchMethodException, InvocationTargetException {
     // Create, initialize, and test the SerDe
@@ -765,22 +629,18 @@ import org.junit.rules.ExpectedException;
         GB_MONTH_EXTRACTIONS,
         groupByMonthExtractQueryResults,
         GB_MONTH_EXTRACTION_RESULTS_RECORDS);
-    // Select query
-    tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES);
-    SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
-    deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, selectQueryResults, SELECT_QUERY_RESULTS_RECORDS);
 
     // Scan query -- results should be same as select query
-    tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES);
+    tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SCAN_COLUMN_NAMES, SCAN_COLUMN_TYPES);
     SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
-    deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SELECT_QUERY_RESULTS_RECORDS);
+    deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SCAN_QUERY_RESULTS_RECORDS);
   }
 
   private static Properties createPropertiesQuery(String dataSource,
-      String queryType,
-      String jsonQuery,
-      String columnNames,
-      String columnTypes) {
+                                                  String queryType,
+                                                  String jsonQuery,
+                                                  String columnNames,
+                                                  String columnTypes) {
     Properties tbl = new Properties();
 
     // Set the configuration parameters
@@ -792,11 +652,12 @@ import org.junit.rules.ExpectedException;
     return tbl;
   }
 
-  @SuppressWarnings("unchecked") private void deserializeQueryResults(DruidSerDe serDe,
-      String queryType,
-      String jsonQuery,
-      byte[] resultString,
-      Object[][] records)
+  @SuppressWarnings("unchecked")
+  private void deserializeQueryResults(DruidSerDe serDe,
+                                       String queryType,
+                                       String jsonQuery,
+                                       byte[] resultString,
+                                       Object[][] records)
       throws SerDeException, IOException, NoSuchFieldException, SecurityException, IllegalArgumentException,
       IllegalAccessException, InterruptedException, NoSuchMethodException, InvocationTargetException {
 
@@ -886,7 +747,8 @@ import org.junit.rules.ExpectedException;
           .put("__time_granularity", 1377907200000L)
           .build());
 
-  @Test public void testDruidObjectSerializer()
+  @Test
+  public void testDruidObjectSerializer()
       throws SerDeException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException,
       IOException, InterruptedException, NoSuchMethodException, InvocationTargetException {
     // Create, initialize, and test the SerDe
@@ -899,9 +761,11 @@ import org.junit.rules.ExpectedException;
     serializeObject(tbl, serDe, ROW_OBJECT, DRUID_WRITABLE);
   }
 
-  @Rule public ExpectedException expectedEx = ExpectedException.none();
+  @Rule
+  public ExpectedException expectedEx = ExpectedException.none();
 
-  @Test public void testDruidObjectSerializerwithNullTimestamp() throws Exception {
+  @Test
+  public void testDruidObjectSerializerwithNullTimestamp() throws Exception {
     // Create, initialize, and test the SerDe
     DruidSerDe serDe = new DruidSerDe();
     Configuration conf = new Configuration();
@@ -939,9 +803,9 @@ import org.junit.rules.ExpectedException;
   }
 
   private static void serializeObject(Properties properties,
-      DruidSerDe serDe,
-      Object[] rowObject,
-      DruidWritable druidWritable) throws SerDeException {
+                                      DruidSerDe serDe,
+                                      Object[] rowObject,
+                                      DruidWritable druidWritable) throws SerDeException {
     // Build OI with timestamp granularity column
     final List<String> columnNames = new ArrayList<>(Utilities.getColumnNames(properties));
     columnNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
@@ -991,7 +855,8 @@ import org.junit.rules.ExpectedException;
           .put("c8", (byte) 0)
           .build());
 
-  @Test public void testDruidObjectDeserializer()
+  @Test
+  public void testDruidObjectDeserializer()
       throws SerDeException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException,
       IOException, InterruptedException, NoSuchMethodException, InvocationTargetException {
     // Create, initialize, and test the SerDe
@@ -1004,9 +869,10 @@ import org.junit.rules.ExpectedException;
     deserializeObject(serDe, ROW_OBJECT_2, DRUID_WRITABLE_2);
   }
 
-  @SuppressWarnings("unchecked") private static void deserializeObject(DruidSerDe serDe,
-      Object[] rowObject,
-      DruidWritable druidWritable) throws SerDeException {
+  @SuppressWarnings("unchecked")
+  private static void deserializeObject(DruidSerDe serDe,
+                                        Object[] rowObject,
+                                        DruidWritable druidWritable) throws SerDeException {
     // Deserialize
     List<Object> object = (List<Object>) serDe.deserialize(druidWritable);
     // Check result
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index 91b5f8b..3fb7bdf 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -150,29 +150,14 @@ import java.util.stream.Collectors;
     RealtimeTuningConfig
         tuningConfig =
         new RealtimeTuningConfig(null,
-            null,
-            null,
-            null,
-            temporaryFolder.newFolder(),
-            null,
-            null,
-            null,
-            null,
-            indexSpec,
-            null,
-            0,
-            0,
-            null,
-            null,
-            0L,
-            null,
-                null);
+            null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, null, 0, 0, null,
+            null, 0L, null, null);
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
     DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig() {
       @Override public File getStorageDirectory() {
         return segmentOutputDir;
       }
-    }, objectMapper);
+    });
 
     Path
         segmentDescriptorPath =
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 9695486..fe6ad2e 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -37,6 +37,7 @@
   <!-- dependencies are always listed in sorted order by groupId, artifectId -->
   <properties>
     <hive.path.to.root>../..</hive.path.to.root>
+    <druid.avatica.version>1.15.0</druid.avatica.version>
     <druid.curator.version>4.0.0</druid.curator.version>
     <druid.jersey.version>1.19.3</druid.jersey.version>
     <druid.jetty.version>9.4.10.v20180503</druid.jetty.version>
@@ -44,6 +45,7 @@
     <druid.guava.version>16.0.1</druid.guava.version>
     <druid.guice.version>4.1.0</druid.guice.version>
     <kafka.test.version>2.0.0</kafka.test.version>
+    <druid.guice.version>4.1.0</druid.guice.version>
     <slf4j.version>1.7.30</slf4j.version>
   </properties>
       <dependencies>
@@ -164,6 +166,26 @@
           <artifactId>curator-recipes</artifactId>
           <version>${druid.curator.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.calcite.avatica</groupId>
+          <artifactId>avatica</artifactId>
+          <version>${avatica.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.calcite.avatica</groupId>
+          <artifactId>avatica-core</artifactId>
+          <version>${avatica.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.calcite.avatica</groupId>
+          <artifactId>avatica-metrics</artifactId>
+          <version>${avatica.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.calcite.avatica</groupId>
+          <artifactId>avatica-server</artifactId>
+          <version>${avatica.version}</version>
+        </dependency>
        <dependency>
           <groupId>com.sun.jersey</groupId>
           <artifactId>jersey-bundle</artifactId>
diff --git a/pom.xml b/pom.xml
index 1432bcf..89c37e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,7 +146,7 @@
     <derby.version>10.14.1.0</derby.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
-    <druid.version>0.14.0-incubating</druid.version>
+    <druid.version>0.17.1</druid.version>
     <flatbuffers.version>1.6.0.1</flatbuffers.version>
     <guava.version>19.0</guava.version>
     <groovy.version>2.4.11</groovy.version>
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
index f6a417b..2331fba 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
@@ -120,7 +120,7 @@ POSTHOOK: query: Select page FROM druid_kafka_test_delimited
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@druid_kafka_test_delimited
 POSTHOOK: Output: hdfs://### HDFS PATH ###
- "Gypsy Danger"
+"Gypsy Danger"
 "Striker Eureka"
 "Cherno Alpha"
 "Crimson Typhoon"
diff --git a/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out b/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out
index 25abb74..0dd2295 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out
@@ -179,7 +179,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cstring1
                     druid.fieldTypes string
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cstring1","lower":"DynamicValue(RS_4_alltypesorc_small_cstring1_min)","upp [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cstring1","lower":"DynamicValue(RS_4_alltypesorc_small_cstr [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -340,7 +340,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames ctinyint
                     druid.fieldTypes tinyint
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"ctinyint","lower":"DynamicValue(RS_4_alltypesorc_small_ctinyint_min)","upp [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"ctinyint","lower":"DynamicValue(RS_4_alltypesorc_small_ctin [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -501,7 +501,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames csmallint
                     druid.fieldTypes smallint
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"csmallint","lower":"DynamicValue(RS_4_alltypesorc_small_csmallint_min)","u [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"csmallint","lower":"DynamicValue(RS_4_alltypesorc_small_csm [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -662,7 +662,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cint
                     druid.fieldTypes int
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cint","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper":"Dyn [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cint","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -823,7 +823,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cbigint
                     druid.fieldTypes bigint
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cbigint","lower":"DynamicValue(RS_4_alltypesorc_small_cbigint_min)","upper [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cbigint","lower":"DynamicValue(RS_4_alltypesorc_small_cbigi [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -984,7 +984,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cfloat
                     druid.fieldTypes float
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"not","field":{"type":"selector","dimension":"cfloat","value":null,"extractionFn":null}},"columns":["cfloat"],"legacy":null,"context":null,"des [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"not","field":{"type":"selector","dimension":"cfloat","value":null,"extractionFn":null}},"columns":["cfloat"],"legacy":null,"con [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -1145,7 +1145,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cdouble
                     druid.fieldTypes double
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdouble","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)","upper [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdouble","lower":"DynamicValue(RS_4_alltypesorc_small_cdoub [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -1295,7 +1295,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames vc
                     druid.fieldTypes timestamp with local time zone
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":null,"columns":["vc"],"legacy":null,"context":null,"descending":false,"g [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":null,"columns":["vc"],"legacy":null,"context":null,"desce [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 348640 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -1456,7 +1456,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cboolean1
                     druid.fieldTypes boolean
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"not","field":{"type":"selector","dimension":"cboolean1","value":null,"extractionFn":null}},"columns":["cboolean1"],"legacy":null,"context":nul [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"not","field":{"type":"selector","dimension":"cboolean1","value":null,"extractionFn":null}},"columns":["cboolean1"],"legacy":nul [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -1617,7 +1617,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cintstring
                     druid.fieldTypes string
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cintstring","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cintstring","lower":"DynamicValue(RS_4_alltypesorc_small_ci [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -1778,7 +1778,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cdoublestring
                     druid.fieldTypes string
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdoublestring","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)", [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdoublestring","lower":"DynamicValue(RS_4_alltypesorc_small [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
@@ -1939,7 +1939,7 @@ STAGE PLANS:
                   properties:
                     druid.fieldNames cfloatstring
                     druid.fieldTypes string
-                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cfloatstring","lower":"DynamicValue(RS_4_alltypesorc_small_cfloat_min)","u [...]
+                    druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cfloatstring","lower":"DynamicValue(RS_4_alltypesorc_small_ [...]
                     druid.query.type scan
                   Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator