You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/02/22 10:18:30 UTC

[1/2] hive git commit: HIVE-15990: Always initialize connection properties in DruidSerDe (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master dc0938c42 -> 8ab1889dd


HIVE-15990: Always initialize connection properties in DruidSerDe (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8973d2c6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8973d2c6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8973d2c6

Branch: refs/heads/master
Commit: 8973d2c66394ed25b1baa20df3920870ae9b053c
Parents: dc0938c
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Feb 20 17:32:46 2017 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Feb 22 10:16:50 2017 +0000

----------------------------------------------------------------------
 .../hadoop/hive/druid/serde/DruidSerDe.java       | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8973d2c6/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 4235e89..bbe29b6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -100,18 +100,21 @@ public class DruidSerDe extends AbstractSerDe {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
 
-  private String[] columns;
-
-  private PrimitiveTypeInfo[] types;
-
   private int numConnection;
-
   private Period readTimeout;
 
+  private String[] columns;
+  private PrimitiveTypeInfo[] types;
   private ObjectInspector inspector;
 
   @Override
   public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+    // Init connection properties
+    numConnection = HiveConf
+          .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+    readTimeout = new Period(
+          HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+
     final List<String> columnNames = new ArrayList<>();
     final List<PrimitiveTypeInfo> columnTypes = new ArrayList<>();
     List<ObjectInspector> inspectors = new ArrayList<>();
@@ -173,11 +176,6 @@ public class DruidSerDe extends AbstractSerDe {
           throw new SerDeException("Druid broker address not specified in configuration");
         }
 
-        numConnection = HiveConf
-              .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
-        readTimeout = new Period(
-              HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-
         // Infer schema
         SegmentAnalysis schemaInfo;
         try {


[2/2] hive git commit: HIVE-15928: Parallelization of Select queries in Druid handler (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Posted by jc...@apache.org.
HIVE-15928: Parallelization of Select queries in Druid handler (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8ab1889d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8ab1889d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8ab1889d

Branch: refs/heads/master
Commit: 8ab1889dd9afe958e96cc62fc973771f61cadcba
Parents: 8973d2c
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Feb 16 14:40:41 2017 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Feb 22 10:17:28 2017 +0000

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  10 +-
 .../druid/io/DruidQueryBasedInputFormat.java    | 124 ++++++++++++++++---
 .../hadoop/hive/druid/io/HiveDruidSplit.java    |  30 ++---
 .../druid/serde/DruidQueryRecordReader.java     |   3 +-
 .../TestHiveDruidQueryBasedInputFormat.java     |  21 ++--
 5 files changed, 132 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7c88f4f..3777fa9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1933,12 +1933,20 @@ public class HiveConf extends Configuration {
     HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081",
             "Address of the Druid coordinator. It is used to check the load status of newly created segments"
     ),
+    HIVE_DRUID_SELECT_DISTRIBUTE("hive.druid.select.distribute", true,
+        "If it is set to true, we distribute the execution of Druid Select queries. Concretely, we retrieve\n" +
+        "the result for Select queries directly from the Druid nodes containing the segments data.\n" +
+        "In particular, first we contact the Druid broker node to obtain the nodes containing the segments\n" +
+        "for the given query, and then we contact those nodes to retrieve the results for the query.\n" +
+        "If it is set to false, we do not execute the Select queries in a distributed fashion. Instead, results\n" +
+        "for those queries are returned by the Druid broker node."),
     HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
+        "Takes only effect when hive.druid.select.distribute is set to false. \n" +
         "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
         "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
         "number of records of the query results is larger than this threshold, we split the query in\n" +
         "total number of rows/threshold parts across the time dimension. Note that we assume the\n" +
-        "records to be split uniformly across the time dimension"),
+        "records to be split uniformly across the time dimension."),
     HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "Number of connections used by\n" +
         "the HTTP client."),
     HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 8b37840..0b35428 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.druid.io;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 import org.joda.time.chrono.ISOChronology;
@@ -60,23 +62,28 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.collect.Lists;
 import com.metamx.common.lifecycle.Lifecycle;
 import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.HttpClientConfig;
 import com.metamx.http.client.HttpClientInit;
+import com.metamx.http.client.Request;
 
 import io.druid.query.BaseQuery;
 import io.druid.query.Druids;
 import io.druid.query.Druids.SegmentMetadataQueryBuilder;
 import io.druid.query.Druids.SelectQueryBuilder;
 import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.LocatedSegmentDescriptor;
 import io.druid.query.Query;
 import io.druid.query.Result;
+import io.druid.query.SegmentDescriptor;
 import io.druid.query.metadata.metadata.SegmentAnalysis;
 import io.druid.query.metadata.metadata.SegmentMetadataQuery;
 import io.druid.query.select.PagingSpec;
 import io.druid.query.select.SelectQuery;
 import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.spec.MultipleSpecificSegmentSpec;
 import io.druid.query.timeboundary.TimeBoundaryQuery;
 import io.druid.query.timeboundary.TimeBoundaryResultValue;
 
@@ -143,12 +150,17 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       case Query.TIMESERIES:
       case Query.TOPN:
       case Query.GROUP_BY:
-        return new HiveDruidSplit[] { new HiveDruidSplit(address,
-                deserializeSerialize(druidQuery), paths[0]) };
+        return new HiveDruidSplit[] { new HiveDruidSplit(deserializeSerialize(druidQuery),
+                paths[0], new String[] {address}) };
       case Query.SELECT:
         SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(
                 druidQuery, SelectQuery.class);
-        return splitSelectQuery(conf, address, selectQuery, paths[0]);
+        boolean distributed = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE);
+        if (distributed) {
+          return distributeSelectQuery(conf, address, selectQuery, paths[0]);
+        } else {
+          return splitSelectQuery(conf, address, selectQuery, paths[0]);
+        }
       default:
         throw new IOException("Druid query type not recognized");
     }
@@ -166,8 +178,83 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
   }
 
+  /* New method that distributes the Select query by creating splits containing
+   * information about different Druid nodes that have the data for the given
+   * query. */
+  private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, String address,
+      SelectQuery query, Path dummyPath) throws IOException {
+    // If it has a limit, we use it and we do not distribute the query
+    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+    if (isFetch) {
+      return new HiveDruidSplit[] { new HiveDruidSplit(
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+              new String[]{address} ) };
+    }
+
+    // Properties from configuration
+    final int numConnection = HiveConf.getIntVar(conf,
+            HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+    final Period readTimeout = new Period(
+            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+
+    // Create request to obtain nodes that are holding data for the given datasource and intervals
+    final Lifecycle lifecycle = new Lifecycle();
+    final HttpClient client = HttpClientInit.createClient(
+            HttpClientConfig.builder().withNumConnections(numConnection)
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
+    try {
+      lifecycle.start();
+    } catch (Exception e) {
+      LOG.error("Lifecycle start issue");
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+    final String intervals =
+            StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets
+    final String request = String.format(
+            "http://%s/druid/v2/datasources/%s/candidates?intervals=%s",
+            address, query.getDataSource().getNames().get(0), intervals);
+    final InputStream response;
+    try {
+      response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request)));
+    } catch (Exception e) {
+      lifecycle.stop();
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+
+    // Retrieve results
+    final List<LocatedSegmentDescriptor> segmentDescriptors;
+    try {
+      segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response,
+              new TypeReference<List<LocatedSegmentDescriptor>>() {});
+    } catch (Exception e) {
+      response.close();
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    } finally {
+      lifecycle.stop();
+    }
+
+    // Create one input split for each segment
+    final int numSplits = segmentDescriptors.size();
+    final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()];
+    for (int i = 0; i < numSplits; i++) {
+      final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i);
+      final String[] hosts = new String[locatedSD.getLocations().size()];
+      for (int j = 0; j < locatedSD.getLocations().size(); j++) {
+        hosts[j] = locatedSD.getLocations().get(j).getHost();
+      }
+      // Create partial Select query
+      final SegmentDescriptor newSD = new SegmentDescriptor(
+              locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber());
+      final SelectQuery partialQuery = query.withQuerySegmentSpec(
+              new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)));
+      splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery),
+              dummyPath, hosts);
+    }
+    return splits;
+  }
+
   /* Method that splits Select query depending on the threshold so read can be
-   * parallelized */
+   * parallelized. We will only contact the Druid broker to obtain all results. */
   private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
           SelectQuery query, Path dummyPath
   ) throws IOException {
@@ -182,7 +269,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     if (isFetch) {
       // If it has a limit, we use it and we do not split the query
       return new HiveDruidSplit[] { new HiveDruidSplit(
-              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+              new String[] {address} ) };
     }
 
     // We do not have the number of rows, thus we need to execute a
@@ -200,7 +288,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     try {
       lifecycle.start();
     } catch (Exception e) {
-      LOG.error("Lifecycle start issue", e);
+      LOG.error("Lifecycle start issue");
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
     }
     InputStream response;
     try {
@@ -231,7 +320,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     if (metadataList.isEmpty()) {
       // There are no rows for that time range, we can submit query as it is
       return new HiveDruidSplit[] { new HiveDruidSplit(
-              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+              new String[] {address} ) };
     }
     if (metadataList.size() != 1) {
       throw new IOException("Information about segments should have been merged");
@@ -242,9 +332,9 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
     query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
     if (numRows <= selectThreshold) {
       // We are not going to split it
-      return new HiveDruidSplit[] { new HiveDruidSplit(address,
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath
-      ) };
+      return new HiveDruidSplit[] { new HiveDruidSplit(
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+              new String[] {address} ) };
     }
 
     // If the query does not specify a timestamp, we obtain the total time using
@@ -266,12 +356,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       try {
         lifecycle.start();
       } catch (Exception e) {
-        LOG.error("Lifecycle start issue", e);
-      }
-      try {
-        lifecycle.start();
-      } catch (Exception e) {
-        LOG.error("Lifecycle start issue", e);
+        LOG.error("Lifecycle start issue");
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
       }
       try {
         response = DruidStorageHandlerUtils.submitRequest(client,
@@ -318,9 +404,9 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
       // Create partial Select query
       final SelectQuery partialQuery = query.withQuerySegmentSpec(
               new MultipleIntervalSegmentSpec(newIntervals.get(i)));
-      splits[i] = new HiveDruidSplit(address,
-              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath
-      );
+      splits[i] = new HiveDruidSplit(
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath,
+              new String[] {address});
     }
     return splits;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
index 861075d..58cb47a 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.druid.io;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -29,56 +30,41 @@ import org.apache.hadoop.mapred.FileSplit;
  */
 public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
 
-  private String address;
-
   private String druidQuery;
 
+  private String[] hosts;
+
   // required for deserialization
   public HiveDruidSplit() {
     super((Path) null, 0, 0, (String[]) null);
   }
 
-  public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
-    super(dummyPath, 0, 0, (String[]) null);
-    this.address = address;
+  public HiveDruidSplit(String druidQuery, Path dummyPath, String hosts[]) {
+    super(dummyPath, 0, 0, hosts);
     this.druidQuery = druidQuery;
+    this.hosts = hosts;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeUTF(address);
     out.writeUTF(druidQuery);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    address = in.readUTF();
     druidQuery = in.readUTF();
   }
 
-  @Override
-  public long getLength() {
-    return 0L;
-  }
-
-  @Override
-  public String[] getLocations() {
-    return new String[] { "" };
-  }
-
-  public String getAddress() {
-    return address;
-  }
-
   public String getDruidQuery() {
     return druidQuery;
   }
 
   @Override
   public String toString() {
-    return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+    return "HiveDruidSplit{" + druidQuery + ", " 
+            + (hosts == null ? "empty hosts" : Arrays.toString(hosts))  + "}";
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 0d5f0b1..8d099c7 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -98,8 +98,7 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
     InputStream response;
     try {
       response = DruidStorageHandlerUtils.submitRequest(client,
-              DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)
-      );
+              DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query));
     } catch (Exception e) {
       lifecycle.stop();
       throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));

http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index 9b7a1da..bb4011b 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -143,8 +143,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + " \"descending\": \"true\", "
           + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
   private static final String TIMESERIES_QUERY_SPLIT =
-      "[HiveDruidSplit{localhost:8082, "
-          + "{\"queryType\":\"timeseries\","
+      "[HiveDruidSplit{{\"queryType\":\"timeseries\","
           + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"},"
           + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]},"
           + "\"descending\":true,"
@@ -152,7 +151,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + "\"granularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1969-12-31T16:00:00.000-08:00\"},"
           + "\"aggregations\":[],"
           + "\"postAggregations\":[],"
-          + "\"context\":null}}]";
+          + "\"context\":null}, [localhost:8082]}]";
 
   private static final String TOPN_QUERY =
       "{  \"queryType\": \"topN\", "
@@ -177,8 +176,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + "  \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
           + " ]}";
   private static final String TOPN_QUERY_SPLIT =
-      "[HiveDruidSplit{localhost:8082, "
-          + "{\"queryType\":\"topN\","
+      "[HiveDruidSplit{{\"queryType\":\"topN\","
           + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_data\"},"
           + "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\"},"
           + "\"metric\":{\"type\":\"LegacyTopNMetricSpec\",\"metric\":\"count\"},"
@@ -190,7 +188,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + "{\"type\":\"doubleSum\",\"name\":\"some_metric\",\"fieldName\":\"some_metric\"}],"
           + "\"postAggregations\":[],"
           + "\"context\":null,"
-          + "\"descending\":false}}]";
+          + "\"descending\":false}, [localhost:8082]}]";
 
   private static final String GROUP_BY_QUERY =
       "{  \"queryType\": \"groupBy\", "
@@ -208,8 +206,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]"
           + " }";
   private static final String GROUP_BY_QUERY_SPLIT =
-      "[HiveDruidSplit{localhost:8082, "
-          + "{\"queryType\":\"groupBy\","
+      "[HiveDruidSplit{{\"queryType\":\"groupBy\","
           + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"},"
           + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]},"
           + "\"filter\":null,"
@@ -223,7 +220,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + "\"limitSpec\":{\"type\":\"default\",\"columns\":[{\"dimension\":\"country\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}},"
           + "{\"dimension\":\"data_transfer\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}],\"limit\":5000},"
           + "\"context\":null,"
-          + "\"descending\":false}}]";
+          + "\"descending\":false}, [localhost:8082]}]";
 
   private static final String SELECT_QUERY =
       "{   \"queryType\": \"select\",  "
@@ -235,8 +232,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, "
           + " \"context\":{\"druid.query.fetch\":true}}";
   private static final String SELECT_QUERY_SPLIT =
-      "[HiveDruidSplit{localhost:8082, "
-          + "{\"queryType\":\"select\","
+      "[HiveDruidSplit{{\"queryType\":\"select\","
           + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"},"
           + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\"]},"
           + "\"descending\":false,"
@@ -252,7 +248,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
           + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\"}],"
           + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],"
           + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false},"
-          + "\"context\":{\"druid.query.fetch\":true}}}]";
+          + "\"context\":{\"druid.query.fetch\":true}}, [localhost:8082]}]";
 
   @Test
   public void testTimeZone() throws Exception {
@@ -289,6 +285,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
     conf.set(Constants.DRUID_DATA_SOURCE, dataSource);
     conf.set(Constants.DRUID_QUERY_JSON, jsonQuery);
     conf.set(Constants.DRUID_QUERY_TYPE, queryType);
+    conf.setBoolean(HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE.varname, false);
     return conf;
   }