You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/04/25 22:38:20 UTC

[hive] branch master updated: HIVE-21621: Update Kafka Clients to recent release 2.2.0 (reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b5d95dd  HIVE-21621: Update Kafka Clients to recent release 2.2.0 (reviewed by Jesus Camacho Rodriguez)
b5d95dd is described below

commit b5d95ddc658d937cc771bfb96df10fe122b03751
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Thu Apr 25 15:36:10 2019 -0700

    HIVE-21621: Update Kafka Clients to recent release 2.2.0 (reviewed by Jesus Camacho Rodriguez)
    
    Close apache/hive#598
---
 itests/qtest-druid/pom.xml                         | 23 ++++++++++++----------
 itests/qtest/pom.xml                               | 10 ++++++++++
 kafka-handler/pom.xml                              | 17 +++++++++++++---
 .../hadoop/hive/kafka/HiveKafkaProducer.java       |  7 ++++---
 .../hadoop/hive/kafka/KafkaBrokerResource.java     |  6 ++++--
 5 files changed, 45 insertions(+), 18 deletions(-)

diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 260e73d..3b9c5c3 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -43,7 +43,7 @@
     <druid.derby.version>10.11.1.1</druid.derby.version>
     <druid.guava.version>16.0.1</druid.guava.version>
     <druid.guice.version>4.1.0</druid.guice.version>
-    <kafka.version>2.0.0</kafka.version>
+    <kafka.test.version>2.0.0</kafka.test.version>
   </properties>
       <dependencies>
         <dependency>
@@ -206,18 +206,17 @@
           <artifactId>guice</artifactId>
           <version>${druid.guice.version}</version>
         </dependency>
-        <!-- inter-project -->
         <dependency>
-          <groupId>junit</groupId>
-          <artifactId>junit</artifactId>
-          <version>${junit.version}</version>
-          <scope>test</scope>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka_2.11</artifactId>
+          <version>${kafka.test.version}</version>
         </dependency>
         <dependency>
           <groupId>org.apache.kafka</groupId>
-          <artifactId>kafka_2.11</artifactId>
-          <version>${kafka.version}</version>
+          <artifactId>kafka-clients</artifactId>
+          <version>${kafka.test.version}</version>
         </dependency>
+        <!-- inter-project -->
         <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-api</artifactId>
@@ -239,8 +238,6 @@
             <goals>
               <goal>shade</goal>
             </goals>
-
-
             <configuration>
               <shadeTestJar>false</shadeTestJar>
               <createDependencyReducedPom>false</createDependencyReducedPom>
@@ -261,6 +258,12 @@
                   <exclude>*:jsp-api*</exclude>
                 </excludes>
               </artifactSet>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.kafka</pattern>
+                  <shadedPattern>org.apache.kafkatests</shadedPattern>
+                </relocation>
+              </relocations>
               <filters>
                 <filter>
                   <artifact>*:*</artifact>
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 801a43d..e19a1b2 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -143,6 +143,16 @@
       <artifactId>kafka-handler</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- test inter-project -->
diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index f907e9d..647b6a6 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -30,7 +30,7 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
-    <kafka.version>2.0.0</kafka.version>
+    <kafka.version>2.2.0</kafka.version>
   </properties>
 
   <artifactId>kafka-handler</artifactId>
@@ -94,7 +94,6 @@
       <scope>test</scope>
       <classifier>test</classifier>
     </dependency>
-
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
@@ -109,6 +108,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.7</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <version>1.7.25</version>
@@ -142,9 +147,15 @@
                   <createDependencyReducedPom>false</createDependencyReducedPom>
                   <artifactSet>
                     <includes>
-                      <include>org.apache.kafka:*</include>
+                      <include>org.apache.kafka:kafka-clients</include>
                     </includes>
                   </artifactSet>
+                  <relocations>
+                    <relocation>
+                      <pattern>org.apache.kafka</pattern>
+                      <shadedPattern>org.apache.kafkaesque</shadedPattern>
+                    </relocation>
+                  </relocations>
                   <filters>
                     <filter>
                       <artifact>*:*</artifact>
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index 2270e08..ba27233 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -39,11 +39,12 @@ import javax.annotation.Nullable;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
 
 /**
  * Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
@@ -107,8 +108,8 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
     kafkaProducer.close();
   }
 
-  @Override public void close(long timeout, TimeUnit unit) {
-    kafkaProducer.close(timeout, unit);
+  @Override public void close(Duration duration) {
+    kafkaProducer.close(duration);
   }
 
   @Override public void flush() {
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index fbcbe9a..a79bf4f 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -56,8 +56,8 @@ class KafkaBrokerResource extends ExternalResource {
   @Override protected void before() throws Throwable {
     // Start the ZK and the Broker
     LOG.info("init embedded Zookeeper");
-    zkServer = new EmbeddedZookeeper();
     tmpLogDir = Files.createTempDirectory("kafka-log-dir-").toAbsolutePath();
+    zkServer = new EmbeddedZookeeper();
     String zkConnect = "127.0.0.1:" + zkServer.port();
     LOG.info("init kafka broker");
     Properties brokerProps = new Properties();
@@ -91,7 +91,9 @@ class KafkaBrokerResource extends ExternalResource {
       kafkaServer.shutdown();
       kafkaServer.awaitShutdown();
     }
-    zkServer.shutdown();
+    if (zkServer != null) {
+      zkServer.shutdown();
+    }
   }
 
   void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {