You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/11/29 09:20:43 UTC

hive git commit: HIVE-15273: Druid http client not configured correctly (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master 755688fda -> df9b2b57a


HIVE-15273: Druid http client not configured correctly (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/df9b2b57
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/df9b2b57
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/df9b2b57

Branch: refs/heads/master
Commit: df9b2b57a05499c4848e95a24a591154640e40fb
Parents: 755688f
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Tue Nov 29 09:06:56 2016 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Nov 29 09:17:45 2016 +0000

----------------------------------------------------------------------
 .../src/java/org/apache/hadoop/hive/conf/HiveConf.java  |  4 ++++
 .../hive/druid/HiveDruidQueryBasedInputFormat.java      | 10 ++++++++--
 .../hadoop/hive/druid/serde/DruidQueryRecordReader.java | 11 ++++++++++-
 .../org/apache/hadoop/hive/druid/serde/DruidSerDe.java  | 12 +++++++++++-
 4 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9a5d604..d15b1bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1938,6 +1938,10 @@ public class HiveConf extends Configuration {
         "number of records of the query results is larger than this threshold, we split the query in\n" +
         "total number of rows/threshold parts across the time dimension. Note that we assume the\n" +
         "records to be split uniformly across the time dimension"),
+    HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "Number of connections used by\n" +
+        "the HTTP client."),
+    HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" +
+        "client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."),
 
     // For HBase storage handler
     HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
index 787cd52..612f853 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.joda.time.chrono.ISOChronology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,7 +161,10 @@ public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, Dr
           String druidQuery, Path dummyPath) throws IOException {
     final int selectThreshold = (int) HiveConf.getIntVar(
             conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
-
+    final int numConnection = HiveConf
+            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+    final Period readTimeout = new Period(
+            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
     SelectQuery query;
     try {
       query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
@@ -184,7 +188,9 @@ public class HiveDruidQueryBasedInputFormat extends InputFormat<NullWritable, Dr
     metadataBuilder.analysisTypes();
     SegmentMetadataQuery metadataQuery = metadataBuilder.build();
 
-    HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+    HttpClient client = HttpClientInit.createClient(
+            HttpClientConfig.builder().withNumConnections(numConnection)
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
     InputStream response;
     try {
       response = DruidStorageHandlerUtils.submitRequest(client,

http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 96bcee87..fe6213b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -23,12 +23,14 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
 import org.apache.hadoop.hive.druid.HiveDruidSplit;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +83,14 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
       LOG.info("Retrieving from druid using query:\n " + query);
     }
 
-    HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+    final int numConnection = HiveConf
+            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+    final Period readTimeout = new Period(
+            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+
+    HttpClient client = HttpClientInit.createClient(
+            HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
+                    .withNumConnections(numConnection).build(), new Lifecycle());
     InputStream response = DruidStorageHandlerUtils.submitRequest(client,
             DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
 

http://git-wip-us.apache.org/repos/asf/hive/blob/df9b2b57/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index 238f7a3..eb78a70 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
+import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +83,9 @@ public class DruidSerDe extends AbstractSerDe {
   private PrimitiveTypeInfo[] types;
   private ObjectInspector inspector;
 
+  private int numConnection;
+
+  private Period readTimeout;
 
   @Override
   public void initialize(Configuration configuration, Properties properties) throws SerDeException {
@@ -113,6 +117,10 @@ public class DruidSerDe extends AbstractSerDe {
         throw new SerDeException("Druid broker address not specified in configuration");
       }
 
+      numConnection = HiveConf
+              .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+      readTimeout = new Period(
+              HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
       // Infer schema
       SegmentAnalysis schemaInfo;
       try {
@@ -184,7 +192,9 @@ public class DruidSerDe extends AbstractSerDe {
   /* Submits the request and returns */
   protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
           throws SerDeException, IOException {
-    HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle());
+    HttpClient client = HttpClientInit.createClient(
+            HttpClientConfig.builder().withNumConnections(numConnection)
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
     InputStream response;
     try {
       response = DruidStorageHandlerUtils.submitRequest(client,