You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/15 16:00:32 UTC

hive git commit: HIVE-19885 : Druid Kafka Ingestion - Allow user to set kafka consumer properties via table properties (Nishant Bangarwa via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 477f54184 -> 0363880b5


HIVE-19885 : Druid Kafka Ingestion - Allow user to set kafka consumer properties via table properties (Nishant Bangarwa via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0363880b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0363880b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0363880b

Branch: refs/heads/master
Commit: 0363880b5e1070eb862528dd48060c3bff05e6ab
Parents: 477f541
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Fri Jun 15 10:59:59 2018 -0500
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Jun 15 10:59:59 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/Constants.java    |  1 +
 .../hadoop/hive/druid/DruidStorageHandler.java    | 16 ++++++++++++++--
 .../hive/cli/TestMiniDruidKafkaCliDriver.java     |  2 --
 .../hadoop/hive/cli/control/CliConfigs.java       |  2 --
 .../java/org/apache/hadoop/hive/ql/QTestUtil.java |  2 +-
 .../queries/clientpositive/druidkafkamini_basic.q |  5 +++--
 .../druid/druidkafkamini_basic.q.out              | 18 +++++++++++-------
 7 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 3d79eec..807d6bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -53,6 +53,7 @@ public class Constants {
   public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
 
   public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion.";
+  public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer.";
   /* Kafka Ingestion state - valid values - START/STOP/RESET */
   public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index fc5a5fa..57e4800 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -407,8 +407,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
               getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"),
               getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"),
               getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"),
-              ImmutableMap.of(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY,
-                  kafka_servers), // Mandatory Property
+              getKafkaConsumerProperties(table, kafka_servers), // Mandatory Property
               getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"),
               getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"),
               getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"),
@@ -420,6 +419,19 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       );
   }
 
+  private static Map<String, String> getKafkaConsumerProperties(Table table, String kafka_servers) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafka_servers);
+    for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
+      if (entry.getKey().startsWith(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) {
+        String propertyName = entry.getKey()
+                .substring(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length());
+        builder.put(propertyName, entry.getValue());
+      }
+    }
+    return builder.build();
+  }
+
   private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) {
     try {
       String task = JSON_MAPPER.writeValueAsString(spec);

http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
----------------------------------------------------------------------
diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
index e2d26ab..4768975 100644
--- a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
+++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hive.cli.control.CliConfigs;
 
 import org.junit.ClassRule;
 import org.junit.Rule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
@@ -32,7 +31,6 @@ import org.junit.runners.Parameterized.Parameters;
 import java.io.File;
 import java.util.List;
 
-@Ignore("HIVE-19509: Disable tests that are failing continuously")
 @RunWith(Parameterized.class)
 public class TestMiniDruidKafkaCliDriver {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index fddd40f..59a78d9 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -199,8 +199,6 @@ public class CliConfigs {
 
         includesFrom(testConfigProps, "druid.kafka.query.files");
 
-        excludeQuery("druidkafkamini_basic.q"); // Disabled in HIVE-19509
-
         setResultsDir("ql/src/test/results/clientpositive/druid");
         setLogDir("itests/qtest/target/tmp/log");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 2365fb7..f19a3ad 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -772,7 +772,7 @@ public class QTestUtil {
       cleanUp();
     }
 
-    if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) {
+    if (clusterType.getCoreClusterType() == CoreClusterType.TEZ && SessionState.get().getTezSession() != null) {
       SessionState.get().getTezSession().destroy();
     }
     if (druidCluster != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
index 229a20c..814890a 100644
--- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
@@ -9,8 +9,9 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la
         "druid.kafka.ingestion.useEarliestOffset" = "true",
         "druid.kafka.ingestion.maxRowsInMemory" = "5",
         "druid.kafka.ingestion.startDelay" = "PT1S",
-        "druid.kafka.ingestion.taskDuration" = "PT20S",
-        "druid.kafka.ingestion.period" = "PT1S"
+        "druid.kafka.ingestion.taskDuration" = "PT60S",
+        "druid.kafka.ingestion.period" = "PT1S",
+        "druid.kafka.ingestion.consumer.retries" = "2"
         );
 
 ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');

http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
index 2e6d768..0743974 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
@@ -8,8 +8,9 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `
         "druid.kafka.ingestion.useEarliestOffset" = "true",
         "druid.kafka.ingestion.maxRowsInMemory" = "5",
         "druid.kafka.ingestion.startDelay" = "PT1S",
-        "druid.kafka.ingestion.taskDuration" = "PT20S",
-        "druid.kafka.ingestion.period" = "PT1S"
+        "druid.kafka.ingestion.taskDuration" = "PT60S",
+        "druid.kafka.ingestion.period" = "PT1S",
+        "druid.kafka.ingestion.consumer.retries" = "2"
         )
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
@@ -24,8 +25,9 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string,
         "druid.kafka.ingestion.useEarliestOffset" = "true",
         "druid.kafka.ingestion.maxRowsInMemory" = "5",
         "druid.kafka.ingestion.startDelay" = "PT1S",
-        "druid.kafka.ingestion.taskDuration" = "PT20S",
-        "druid.kafka.ingestion.period" = "PT1S"
+        "druid.kafka.ingestion.taskDuration" = "PT60S",
+        "druid.kafka.ingestion.period" = "PT1S",
+        "druid.kafka.ingestion.consumer.retries" = "2"
         )
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
@@ -141,7 +143,7 @@ kafkaPartitions=1
 activeTasks=[]	 	 
 publishingTasks=[]	 	 
 latestOffsets={0=10}	 	 
-minimumLag={}	 	 
+minimumLag={0=0}	 	 
 aggregateLag=0	 	 
 #### A masked pattern was here ####
 PREHOOK: query: Select count(*) FROM druid_kafka_test
@@ -346,10 +348,11 @@ STAGE PLANS:
                     druid.datasource default.druid_kafka_test
                     druid.fieldNames language,user
                     druid.fieldTypes string,string
+                    druid.kafka.ingestion.consumer.retries 2
                     druid.kafka.ingestion.maxRowsInMemory 5
                     druid.kafka.ingestion.period PT1S
                     druid.kafka.ingestion.startDelay PT1S
-                    druid.kafka.ingestion.taskDuration PT20S
+                    druid.kafka.ingestion.taskDuration PT60S
                     druid.kafka.ingestion.useEarliestOffset true
                     druid.query.granularity MINUTE
                     druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
@@ -385,10 +388,11 @@ STAGE PLANS:
                       druid.datasource default.druid_kafka_test
                       druid.fieldNames language,user
                       druid.fieldTypes string,string
+                      druid.kafka.ingestion.consumer.retries 2
                       druid.kafka.ingestion.maxRowsInMemory 5
                       druid.kafka.ingestion.period PT1S
                       druid.kafka.ingestion.startDelay PT1S
-                      druid.kafka.ingestion.taskDuration PT20S
+                      druid.kafka.ingestion.taskDuration PT60S
                       druid.kafka.ingestion.useEarliestOffset true
                       druid.query.granularity MINUTE
                       druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}