You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/02/22 10:18:31 UTC
[2/2] hive git commit: HIVE-15928: Parallelization of Select queries
in Druid handler (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
HIVE-15928: Parallelization of Select queries in Druid handler (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8ab1889d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8ab1889d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8ab1889d
Branch: refs/heads/master
Commit: 8ab1889dd9afe958e96cc62fc973771f61cadcba
Parents: 8973d2c
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Feb 16 14:40:41 2017 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Feb 22 10:17:28 2017 +0000
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 10 +-
.../druid/io/DruidQueryBasedInputFormat.java | 124 ++++++++++++++++---
.../hadoop/hive/druid/io/HiveDruidSplit.java | 30 ++---
.../druid/serde/DruidQueryRecordReader.java | 3 +-
.../TestHiveDruidQueryBasedInputFormat.java | 21 ++--
5 files changed, 132 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7c88f4f..3777fa9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1933,12 +1933,20 @@ public class HiveConf extends Configuration {
HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081",
"Address of the Druid coordinator. It is used to check the load status of newly created segments"
),
+ HIVE_DRUID_SELECT_DISTRIBUTE("hive.druid.select.distribute", true,
+ "If it is set to true, we distribute the execution of Druid Select queries. Concretely, we retrieve\n" +
+ "the result for Select queries directly from the Druid nodes containing the segments data.\n" +
+ "In particular, first we contact the Druid broker node to obtain the nodes containing the segments\n" +
+ "for the given query, and then we contact those nodes to retrieve the results for the query.\n" +
+ "If it is set to false, we do not execute the Select queries in a distributed fashion. Instead, results\n" +
+ "for those queries are returned by the Druid broker node."),
HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
+ "Takes only effect when hive.druid.select.distribute is set to false. \n" +
"When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +
"per query. In order to do that, we obtain the estimated size for the complete result. If the\n" +
"number of records of the query results is larger than this threshold, we split the query in\n" +
"total number of rows/threshold parts across the time dimension. Note that we assume the\n" +
- "records to be split uniformly across the time dimension"),
+ "records to be split uniformly across the time dimension."),
HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "Number of connections used by\n" +
"the HTTP client."),
HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 8b37840..0b35428 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.druid.io;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
@@ -60,23 +62,28 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.collect.Lists;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
+import com.metamx.http.client.Request;
import io.druid.query.BaseQuery;
import io.druid.query.Druids;
import io.druid.query.Druids.SegmentMetadataQueryBuilder;
import io.druid.query.Druids.SelectQueryBuilder;
import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.LocatedSegmentDescriptor;
import io.druid.query.Query;
import io.druid.query.Result;
+import io.druid.query.SegmentDescriptor;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
@@ -143,12 +150,17 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
case Query.TIMESERIES:
case Query.TOPN:
case Query.GROUP_BY:
- return new HiveDruidSplit[] { new HiveDruidSplit(address,
- deserializeSerialize(druidQuery), paths[0]) };
+ return new HiveDruidSplit[] { new HiveDruidSplit(deserializeSerialize(druidQuery),
+ paths[0], new String[] {address}) };
case Query.SELECT:
SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(
druidQuery, SelectQuery.class);
- return splitSelectQuery(conf, address, selectQuery, paths[0]);
+ boolean distributed = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE);
+ if (distributed) {
+ return distributeSelectQuery(conf, address, selectQuery, paths[0]);
+ } else {
+ return splitSelectQuery(conf, address, selectQuery, paths[0]);
+ }
default:
throw new IOException("Druid query type not recognized");
}
@@ -166,8 +178,83 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
}
+ /* New method that distributes the Select query by creating splits containing
+ * information about different Druid nodes that have the data for the given
+ * query. */
+ private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, String address,
+ SelectQuery query, Path dummyPath) throws IOException {
+ // If it has a limit, we use it and we do not distribute the query
+ final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+ if (isFetch) {
+ return new HiveDruidSplit[] { new HiveDruidSplit(
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+ new String[]{address} ) };
+ }
+
+ // Properties from configuration
+ final int numConnection = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+ final Period readTimeout = new Period(
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+
+ // Create request to obtain nodes that are holding data for the given datasource and intervals
+ final Lifecycle lifecycle = new Lifecycle();
+ final HttpClient client = HttpClientInit.createClient(
+ HttpClientConfig.builder().withNumConnections(numConnection)
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
+ try {
+ lifecycle.start();
+ } catch (Exception e) {
+ LOG.error("Lifecycle start issue");
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ final String intervals =
+ StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets
+ final String request = String.format(
+ "http://%s/druid/v2/datasources/%s/candidates?intervals=%s",
+ address, query.getDataSource().getNames().get(0), intervals);
+ final InputStream response;
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request)));
+ } catch (Exception e) {
+ lifecycle.stop();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+
+ // Retrieve results
+ final List<LocatedSegmentDescriptor> segmentDescriptors;
+ try {
+ segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response,
+ new TypeReference<List<LocatedSegmentDescriptor>>() {});
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ } finally {
+ lifecycle.stop();
+ }
+
+ // Create one input split for each segment
+ final int numSplits = segmentDescriptors.size();
+ final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()];
+ for (int i = 0; i < numSplits; i++) {
+ final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i);
+ final String[] hosts = new String[locatedSD.getLocations().size()];
+ for (int j = 0; j < locatedSD.getLocations().size(); j++) {
+ hosts[j] = locatedSD.getLocations().get(j).getHost();
+ }
+ // Create partial Select query
+ final SegmentDescriptor newSD = new SegmentDescriptor(
+ locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber());
+ final SelectQuery partialQuery = query.withQuerySegmentSpec(
+ new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)));
+ splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery),
+ dummyPath, hosts);
+ }
+ return splits;
+ }
+
/* Method that splits Select query depending on the threshold so read can be
- * parallelized */
+ * parallelized. We will only contact the Druid broker to obtain all results. */
private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
SelectQuery query, Path dummyPath
) throws IOException {
@@ -182,7 +269,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
if (isFetch) {
// If it has a limit, we use it and we do not split the query
return new HiveDruidSplit[] { new HiveDruidSplit(
- address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+ new String[] {address} ) };
}
// We do not have the number of rows, thus we need to execute a
@@ -200,7 +288,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
try {
lifecycle.start();
} catch (Exception e) {
- LOG.error("Lifecycle start issue", e);
+ LOG.error("Lifecycle start issue");
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
InputStream response;
try {
@@ -231,7 +320,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
if (metadataList.isEmpty()) {
// There are no rows for that time range, we can submit query as it is
return new HiveDruidSplit[] { new HiveDruidSplit(
- address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+ new String[] {address} ) };
}
if (metadataList.size() != 1) {
throw new IOException("Information about segments should have been merged");
@@ -242,9 +332,9 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
if (numRows <= selectThreshold) {
// We are not going to split it
- return new HiveDruidSplit[] { new HiveDruidSplit(address,
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath
- ) };
+ return new HiveDruidSplit[] { new HiveDruidSplit(
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath,
+ new String[] {address} ) };
}
// If the query does not specify a timestamp, we obtain the total time using
@@ -266,12 +356,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
try {
lifecycle.start();
} catch (Exception e) {
- LOG.error("Lifecycle start issue", e);
- }
- try {
- lifecycle.start();
- } catch (Exception e) {
- LOG.error("Lifecycle start issue", e);
+ LOG.error("Lifecycle start issue");
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
}
try {
response = DruidStorageHandlerUtils.submitRequest(client,
@@ -318,9 +404,9 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
// Create partial Select query
final SelectQuery partialQuery = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(newIntervals.get(i)));
- splits[i] = new HiveDruidSplit(address,
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath
- );
+ splits[i] = new HiveDruidSplit(
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath,
+ new String[] {address});
}
return splits;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
index 861075d..58cb47a 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.druid.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -29,56 +30,41 @@ import org.apache.hadoop.mapred.FileSplit;
*/
public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
- private String address;
-
private String druidQuery;
+ private String[] hosts;
+
// required for deserialization
public HiveDruidSplit() {
super((Path) null, 0, 0, (String[]) null);
}
- public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
- super(dummyPath, 0, 0, (String[]) null);
- this.address = address;
+ public HiveDruidSplit(String druidQuery, Path dummyPath, String hosts[]) {
+ super(dummyPath, 0, 0, hosts);
this.druidQuery = druidQuery;
+ this.hosts = hosts;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeUTF(address);
out.writeUTF(druidQuery);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- address = in.readUTF();
druidQuery = in.readUTF();
}
- @Override
- public long getLength() {
- return 0L;
- }
-
- @Override
- public String[] getLocations() {
- return new String[] { "" };
- }
-
- public String getAddress() {
- return address;
- }
-
public String getDruidQuery() {
return druidQuery;
}
@Override
public String toString() {
- return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+ return "HiveDruidSplit{" + druidQuery + ", "
+ + (hosts == null ? "empty hosts" : Arrays.toString(hosts)) + "}";
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 0d5f0b1..8d099c7 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -98,8 +98,7 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
InputStream response;
try {
response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)
- );
+ DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query));
} catch (Exception e) {
lifecycle.stop();
throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
http://git-wip-us.apache.org/repos/asf/hive/blob/8ab1889d/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index 9b7a1da..bb4011b 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -143,8 +143,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ " \"descending\": \"true\", "
+ " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}";
private static final String TIMESERIES_QUERY_SPLIT =
- "[HiveDruidSplit{localhost:8082, "
- + "{\"queryType\":\"timeseries\","
+ "[HiveDruidSplit{{\"queryType\":\"timeseries\","
+ "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]},"
+ "\"descending\":true,"
@@ -152,7 +151,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ "\"granularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1969-12-31T16:00:00.000-08:00\"},"
+ "\"aggregations\":[],"
+ "\"postAggregations\":[],"
- + "\"context\":null}}]";
+ + "\"context\":null}, [localhost:8082]}]";
private static final String TOPN_QUERY =
"{ \"queryType\": \"topN\", "
@@ -177,8 +176,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" "
+ " ]}";
private static final String TOPN_QUERY_SPLIT =
- "[HiveDruidSplit{localhost:8082, "
- + "{\"queryType\":\"topN\","
+ "[HiveDruidSplit{{\"queryType\":\"topN\","
+ "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_data\"},"
+ "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\"},"
+ "\"metric\":{\"type\":\"LegacyTopNMetricSpec\",\"metric\":\"count\"},"
@@ -190,7 +188,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ "{\"type\":\"doubleSum\",\"name\":\"some_metric\",\"fieldName\":\"some_metric\"}],"
+ "\"postAggregations\":[],"
+ "\"context\":null,"
- + "\"descending\":false}}]";
+ + "\"descending\":false}, [localhost:8082]}]";
private static final String GROUP_BY_QUERY =
"{ \"queryType\": \"groupBy\", "
@@ -208,8 +206,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]"
+ " }";
private static final String GROUP_BY_QUERY_SPLIT =
- "[HiveDruidSplit{localhost:8082, "
- + "{\"queryType\":\"groupBy\","
+ "[HiveDruidSplit{{\"queryType\":\"groupBy\","
+ "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]},"
+ "\"filter\":null,"
@@ -223,7 +220,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ "\"limitSpec\":{\"type\":\"default\",\"columns\":[{\"dimension\":\"country\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}},"
+ "{\"dimension\":\"data_transfer\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}],\"limit\":5000},"
+ "\"context\":null,"
- + "\"descending\":false}}]";
+ + "\"descending\":false}, [localhost:8082]}]";
private static final String SELECT_QUERY =
"{ \"queryType\": \"select\", "
@@ -235,8 +232,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, "
+ " \"context\":{\"druid.query.fetch\":true}}";
private static final String SELECT_QUERY_SPLIT =
- "[HiveDruidSplit{localhost:8082, "
- + "{\"queryType\":\"select\","
+ "[HiveDruidSplit{{\"queryType\":\"select\","
+ "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\"]},"
+ "\"descending\":false,"
@@ -252,7 +248,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
+ "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\"}],"
+ "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],"
+ "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false},"
- + "\"context\":{\"druid.query.fetch\":true}}}]";
+ + "\"context\":{\"druid.query.fetch\":true}}, [localhost:8082]}]";
@Test
public void testTimeZone() throws Exception {
@@ -289,6 +285,7 @@ public class TestHiveDruidQueryBasedInputFormat extends TestCase {
conf.set(Constants.DRUID_DATA_SOURCE, dataSource);
conf.set(Constants.DRUID_QUERY_JSON, jsonQuery);
conf.set(Constants.DRUID_QUERY_TYPE, queryType);
+ conf.setBoolean(HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE.varname, false);
return conf;
}