You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/04/25 22:38:20 UTC
[hive] branch master updated: HIVE-21621: Update Kafka Clients to
recent release 2.2.0 (reviewed by Jesus Camacho Rodriguez)
This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b5d95dd HIVE-21621: Update Kafka Clients to recent release 2.2.0 (reviewed by Jesus Camacho Rodriguez)
b5d95dd is described below
commit b5d95ddc658d937cc771bfb96df10fe122b03751
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Thu Apr 25 15:36:10 2019 -0700
HIVE-21621: Update Kafka Clients to recent release 2.2.0 (reviewed by Jesus Camacho Rodriguez)
Close apache/hive#598
---
itests/qtest-druid/pom.xml | 23 ++++++++++++----------
itests/qtest/pom.xml | 10 ++++++++++
kafka-handler/pom.xml | 17 +++++++++++++---
.../hadoop/hive/kafka/HiveKafkaProducer.java | 7 ++++---
.../hadoop/hive/kafka/KafkaBrokerResource.java | 6 ++++--
5 files changed, 45 insertions(+), 18 deletions(-)
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 260e73d..3b9c5c3 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -43,7 +43,7 @@
<druid.derby.version>10.11.1.1</druid.derby.version>
<druid.guava.version>16.0.1</druid.guava.version>
<druid.guice.version>4.1.0</druid.guice.version>
- <kafka.version>2.0.0</kafka.version>
+ <kafka.test.version>2.0.0</kafka.test.version>
</properties>
<dependencies>
<dependency>
@@ -206,18 +206,17 @@
<artifactId>guice</artifactId>
<version>${druid.guice.version}</version>
</dependency>
- <!-- inter-project -->
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.test.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>${kafka.version}</version>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.test.version}</version>
</dependency>
+ <!-- inter-project -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -239,8 +238,6 @@
<goals>
<goal>shade</goal>
</goals>
-
-
<configuration>
<shadeTestJar>false</shadeTestJar>
<createDependencyReducedPom>false</createDependencyReducedPom>
@@ -261,6 +258,12 @@
<exclude>*:jsp-api*</exclude>
</excludes>
</artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+ <shadedPattern>org.apache.kafkatests</shadedPattern>
+ </relocation>
+ </relocations>
<filters>
<filter>
<artifact>*:*</artifact>
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 801a43d..e19a1b2 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -143,6 +143,16 @@
<artifactId>kafka-handler</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- test inter-project -->
diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index f907e9d..647b6a6 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -30,7 +30,7 @@
<properties>
<hive.path.to.root>..</hive.path.to.root>
- <kafka.version>2.0.0</kafka.version>
+ <kafka.version>2.2.0</kafka.version>
</properties>
<artifactId>kafka-handler</artifactId>
@@ -94,7 +94,6 @@
<scope>test</scope>
<classifier>test</classifier>
</dependency>
-
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
@@ -109,6 +108,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.7</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
@@ -142,9 +147,15 @@
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
- <include>org.apache.kafka:*</include>
+ <include>org.apache.kafka:kafka-clients</include>
</includes>
</artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+ <shadedPattern>org.apache.kafkaesque</shadedPattern>
+ </relocation>
+ </relocations>
<filters>
<filter>
<artifact>*:*</artifact>
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index 2270e08..ba27233 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -39,11 +39,12 @@ import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
/**
* Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
@@ -107,8 +108,8 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
kafkaProducer.close();
}
- @Override public void close(long timeout, TimeUnit unit) {
- kafkaProducer.close(timeout, unit);
+ @Override public void close(Duration duration) {
+ kafkaProducer.close(duration);
}
@Override public void flush() {
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index fbcbe9a..a79bf4f 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -56,8 +56,8 @@ class KafkaBrokerResource extends ExternalResource {
@Override protected void before() throws Throwable {
// Start the ZK and the Broker
LOG.info("init embedded Zookeeper");
- zkServer = new EmbeddedZookeeper();
tmpLogDir = Files.createTempDirectory("kafka-log-dir-").toAbsolutePath();
+ zkServer = new EmbeddedZookeeper();
String zkConnect = "127.0.0.1:" + zkServer.port();
LOG.info("init kafka broker");
Properties brokerProps = new Properties();
@@ -91,7 +91,9 @@ class KafkaBrokerResource extends ExternalResource {
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}
- zkServer.shutdown();
+ if (zkServer != null) {
+ zkServer.shutdown();
+ }
}
void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {