You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/15 16:00:32 UTC
hive git commit: HIVE-19885 : Druid Kafka Ingestion - Allow user to
set kafka consumer properties via table properties (Nishant Bangarwa via
Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 477f54184 -> 0363880b5
HIVE-19885 : Druid Kafka Ingestion - Allow user to set kafka consumer properties via table properties (Nishant Bangarwa via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0363880b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0363880b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0363880b
Branch: refs/heads/master
Commit: 0363880b5e1070eb862528dd48060c3bff05e6ab
Parents: 477f541
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Fri Jun 15 10:59:59 2018 -0500
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Jun 15 10:59:59 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/Constants.java | 1 +
.../hadoop/hive/druid/DruidStorageHandler.java | 16 ++++++++++++++--
.../hive/cli/TestMiniDruidKafkaCliDriver.java | 2 --
.../hadoop/hive/cli/control/CliConfigs.java | 2 --
.../java/org/apache/hadoop/hive/ql/QTestUtil.java | 2 +-
.../queries/clientpositive/druidkafkamini_basic.q | 5 +++--
.../druid/druidkafkamini_basic.q.out | 18 +++++++++++-------
7 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 3d79eec..807d6bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -53,6 +53,7 @@ public class Constants {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion.";
+ public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer.";
/* Kafka Ingestion state - valid values - START/STOP/RESET */
public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion";
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index fc5a5fa..57e4800 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -407,8 +407,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"),
getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"),
getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"),
- ImmutableMap.of(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY,
- kafka_servers), // Mandatory Property
+ getKafkaConsumerProperties(table, kafka_servers), // Mandatory Property
getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"),
getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"),
getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"),
@@ -420,6 +419,19 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
);
}
+ private static Map<String, String> getKafkaConsumerProperties(Table table, String kafka_servers) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafka_servers);
+ for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
+ if (entry.getKey().startsWith(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) {
+ String propertyName = entry.getKey()
+ .substring(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length());
+ builder.put(propertyName, entry.getValue());
+ }
+ }
+ return builder.build();
+ }
+
private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) {
try {
String task = JSON_MAPPER.writeValueAsString(spec);
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
----------------------------------------------------------------------
diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
index e2d26ab..4768975 100644
--- a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
+++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hive.cli.control.CliConfigs;
import org.junit.ClassRule;
import org.junit.Rule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
@@ -32,7 +31,6 @@ import org.junit.runners.Parameterized.Parameters;
import java.io.File;
import java.util.List;
-@Ignore("HIVE-19509: Disable tests that are failing continuously")
@RunWith(Parameterized.class)
public class TestMiniDruidKafkaCliDriver {
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index fddd40f..59a78d9 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -199,8 +199,6 @@ public class CliConfigs {
includesFrom(testConfigProps, "druid.kafka.query.files");
- excludeQuery("druidkafkamini_basic.q"); // Disabled in HIVE-19509
-
setResultsDir("ql/src/test/results/clientpositive/druid");
setLogDir("itests/qtest/target/tmp/log");
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 2365fb7..f19a3ad 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -772,7 +772,7 @@ public class QTestUtil {
cleanUp();
}
- if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) {
+ if (clusterType.getCoreClusterType() == CoreClusterType.TEZ && SessionState.get().getTezSession() != null) {
SessionState.get().getTezSession().destroy();
}
if (druidCluster != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
index 229a20c..814890a 100644
--- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
@@ -9,8 +9,9 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
- "druid.kafka.ingestion.taskDuration" = "PT20S",
- "druid.kafka.ingestion.period" = "PT1S"
+ "druid.kafka.ingestion.taskDuration" = "PT60S",
+ "druid.kafka.ingestion.period" = "PT1S",
+ "druid.kafka.ingestion.consumer.retries" = "2"
);
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
http://git-wip-us.apache.org/repos/asf/hive/blob/0363880b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
index 2e6d768..0743974 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
@@ -8,8 +8,9 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
- "druid.kafka.ingestion.taskDuration" = "PT20S",
- "druid.kafka.ingestion.period" = "PT1S"
+ "druid.kafka.ingestion.taskDuration" = "PT60S",
+ "druid.kafka.ingestion.period" = "PT1S",
+ "druid.kafka.ingestion.consumer.retries" = "2"
)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
@@ -24,8 +25,9 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string,
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
- "druid.kafka.ingestion.taskDuration" = "PT20S",
- "druid.kafka.ingestion.period" = "PT1S"
+ "druid.kafka.ingestion.taskDuration" = "PT60S",
+ "druid.kafka.ingestion.period" = "PT1S",
+ "druid.kafka.ingestion.consumer.retries" = "2"
)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
@@ -141,7 +143,7 @@ kafkaPartitions=1
activeTasks=[]
publishingTasks=[]
latestOffsets={0=10}
-minimumLag={}
+minimumLag={0=0}
aggregateLag=0
#### A masked pattern was here ####
PREHOOK: query: Select count(*) FROM druid_kafka_test
@@ -346,10 +348,11 @@ STAGE PLANS:
druid.datasource default.druid_kafka_test
druid.fieldNames language,user
druid.fieldTypes string,string
+ druid.kafka.ingestion.consumer.retries 2
druid.kafka.ingestion.maxRowsInMemory 5
druid.kafka.ingestion.period PT1S
druid.kafka.ingestion.startDelay PT1S
- druid.kafka.ingestion.taskDuration PT20S
+ druid.kafka.ingestion.taskDuration PT60S
druid.kafka.ingestion.useEarliestOffset true
druid.query.granularity MINUTE
druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
@@ -385,10 +388,11 @@ STAGE PLANS:
druid.datasource default.druid_kafka_test
druid.fieldNames language,user
druid.fieldTypes string,string
+ druid.kafka.ingestion.consumer.retries 2
druid.kafka.ingestion.maxRowsInMemory 5
druid.kafka.ingestion.period PT1S
druid.kafka.ingestion.startDelay PT1S
- druid.kafka.ingestion.taskDuration PT20S
+ druid.kafka.ingestion.taskDuration PT60S
druid.kafka.ingestion.useEarliestOffset true
druid.query.granularity MINUTE
druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}