You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/11/29 09:20:43 UTC
hive git commit: HIVE-15273: Druid http client not configured
correctly (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Repository: hive
Updated Branches:
refs/heads/master 755688fda -> df9b2b57a
HIVE-15273: Druid http client not configured correctly (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/df9b2b57
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/df9b2b57
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/df9b2b57
Branch: refs/heads/master
Commit: df9b2b57a05499c4848e95a24a591154640e40fb
Parents: 755688f
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Tue Nov 29 09:06:56 2016 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Nov 29 09:17:45 2016 +0000
----------------------------------------------------------------------
.../src/java/org/apache/hadoop/hive/conf/HiveConf.java | 4 ++++
.../hive/druid/HiveDruidQueryBasedInputFormat.java | 10 ++++++++--
.../hadoop/hive/druid/serde/DruidQueryRecordReader.java | 11 ++++++++++-
.../org/apache/hadoop/hive/druid/serde/DruidSerDe.java | 12 +++++++++++-
4 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9a5d604..d15b1bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1938,6 +1938,10 @@ public class HiveConf extends Configuration {
"number of records of the query results is larger than this threshold, we split the query in\n" +
"total number of rows/threshold parts across the time dimension. Note that we assume the\n" +
"records to be split uniformly across the time dimension"),
+ HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "Number of connections used by\n" +
+ "the HTTP client."),
+ HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
+ "client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."),
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
index 787cd52..612f853 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.joda.time.Interval;
+import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +161,10 @@ public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, Dr
String druidQuery, Path dummyPath) throws IOException {
final int selectThreshold = (int) HiveConf.getIntVar(
conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
-
+ final int numConnection = HiveConf
+ .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+ final Period readTimeout = new Period(
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
SelectQuery query;
try {
query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
@@ -184,7 +188,9 @@ public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, Dr
metadataBuilder.analysisTypes();
SegmentMetadataQuery metadataQuery = metadataBuilder.build();
- HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+ HttpClient client = HttpClientInit.createClient(
+ HttpClientConfig.builder().withNumConnections(numConnection)
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
InputStream response;
try {
response = DruidStorageHandlerUtils.submitRequest(client,
http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 96bcee87..fe6213b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -23,12 +23,14 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.druid.HiveDruidSplit;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +83,14 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
LOG.info("Retrieving from druid using query:\n " + query);
}
- HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+ final int numConnection = HiveConf
+ .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+ final Period readTimeout = new Period(
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+
+ HttpClient client = HttpClientInit.createClient(
+ HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
+ .withNumConnections(numConnection).build(), new Lifecycle());
InputStream response = DruidStorageHandlerUtils.submitRequest(client,
DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 238f7a3..eb78a70 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
+import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,9 @@ public class DruidSerDe extends AbstractSerDe {
private PrimitiveTypeInfo[] types;
private ObjectInspector inspector;
+ private int numConnection;
+
+ private Period readTimeout;
@Override
public void initialize(Configuration configuration, Properties properties) throws SerDeException {
@@ -113,6 +117,10 @@ public class DruidSerDe extends AbstractSerDe {
throw new SerDeException("Druid broker address not specified in configuration");
}
+ numConnection = HiveConf
+ .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+ readTimeout = new Period(
+ HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
// Infer schema
SegmentAnalysis schemaInfo;
try {
@@ -184,7 +192,9 @@ public class DruidSerDe extends AbstractSerDe {
/* Submits the request and returns */
protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
throws SerDeException, IOException {
- HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+ HttpClient client = HttpClientInit.createClient(
+ HttpClientConfig.builder().withNumConnections(numConnection)
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
InputStream response;
try {
response = DruidStorageHandlerUtils.submitRequest(client,