You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/08/21 17:56:49 UTC

[hive] branch master updated: HIVE-22125: Update Kafka clients to 2.3 (Slim B via Ashutosh Chauchan)

This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cf38b6  HIVE-22125: Update Kafka clients to 2.3 (Slim B via Ashutosh Chauchan)
1cf38b6 is described below

commit 1cf38b648e9517cbd84ee7faada72996bc757e56
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Wed Aug 21 10:56:40 2019 -0700

    HIVE-22125: Update Kafka clients to 2.3 (Slim B via Ashutosh Chauchan)
---
 kafka-handler/pom.xml                              |  2 +-
 .../hadoop/hive/kafka/HiveKafkaProducer.java       |  9 ++-----
 .../hadoop/hive/kafka/KafkaStorageHandler.java     |  2 +-
 .../hive/kafka/TransactionalKafkaWriter.java       |  6 +++--
 .../hadoop/hive/kafka/HiveKafkaProducerTest.java   | 29 ++++++++++++++--------
 .../hadoop/hive/kafka/KafkaBrokerResource.java     | 10 ++++----
 .../hive/kafka/TransactionalKafkaWriterTest.java   |  4 +--
 7 files changed, 33 insertions(+), 29 deletions(-)

diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index 647b6a6..02f5a27 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -30,7 +30,7 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
-    <kafka.version>2.2.0</kafka.version>
+    <kafka.version>2.3.0</kafka.version>
   </properties>
 
   <artifactId>kafka-handler</artifactId>
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index ba27233..9f85681 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-
 /**
  * Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
  * This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
@@ -133,15 +132,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
 
     Object transactionManager = getValue(kafkaProducer, "transactionManager");
 
-    Object nextSequence = getValue(transactionManager, "nextSequence");
-    Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence");
-
+    Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
     invoke(transactionManager,
         "transitionTo",
         getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-    invoke(nextSequence, "clear");
-    invoke(lastAckedSequence, "clear");
-
+    invoke(topicPartitionBookkeeper, "reset");
     Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
     setValue(producerIdAndEpoch, "producerId", producerId);
     setValue(producerIdAndEpoch, "epoch", epoch);
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index d87f245..c3ddbb5 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -302,7 +302,7 @@ import java.util.function.Predicate;
       RetryUtils.retry(buildProducersTask, isRetrayable, cleanUpTheMap, maxTries, "Error while Builing Producers");
     } catch (Exception e) {
       // Can not go further
-      LOG.error("Can not fetch build produces due [{}]", e.getMessage());
+      LOG.error("Can not fetch build produces due [{}]", e);
       throw new MetaException(e.getMessage());
     }
 
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
index fb4d034..e4015fc 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -63,6 +64,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
 
   private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class);
   private static final String TRANSACTION_DIR = "transaction_states";
+  private static final Duration DURATION_0 = Duration.ofMillis(0);
 
   private final String topic;
   private final HiveKafkaProducer<byte[], byte[]> producer;
@@ -178,7 +180,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
       } catch (Exception e) {
         LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage());
       }
-      producer.close(0, TimeUnit.MILLISECONDS);
+      producer.close(DURATION_0);
       return;
     }
 
@@ -209,11 +211,11 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
       persistTxState();
     }
     checkExceptions();
-    producer.close();
     LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]",
         producer.getTransactionalId(),
         sentRecords,
         topic);
+    producer.close(Duration.ZERO);
   }
 
   private void commitTransaction() {
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
index db2515c..8c9ed5f 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -56,16 +57,17 @@ import java.util.stream.IntStream;
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducerTest.class);
   private static final int RECORD_NUMBER = 17384;
-  private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+  private static final byte[] KEY_BYTES = "KEY".getBytes(StandardCharsets.UTF_8);
   private static final KafkaBrokerResource KAFKA_BROKER_RESOURCE = new KafkaBrokerResource();
 
   private static final String TOPIC = "test-tx-producer";
   private static final List<ProducerRecord<byte[], byte[]>>
       RECORDS =
       IntStream.range(0, RECORD_NUMBER).mapToObj(number -> {
-        final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8"));
+        final byte[] value = ("VALUE-" + number).getBytes(StandardCharsets.UTF_8);
         return new ProducerRecord<>(TOPIC, value, KEY_BYTES);
       }).collect(Collectors.toList());
+  private static final short MAX_ATTEMPTS = 500;
 
   @BeforeClass public static void setupCluster() throws Throwable {
     KAFKA_BROKER_RESOURCE.before();
@@ -109,7 +111,8 @@ import java.util.stream.IntStream;
     consumer = null;
   }
 
-  @Test public void resumeTransaction() {
+
+  @Test(timeout = 120_000) public void resumeTransaction() {
     producer.initTransactions();
     producer.beginTransaction();
     long pid = producer.getProducerId();
@@ -119,7 +122,7 @@ import java.util.stream.IntStream;
     //noinspection unchecked
     RECORDS.forEach(producer::send);
     producer.flush();
-    producer.close();
+    producer.close(Duration.ZERO);
 
     HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(pid, epoch);
@@ -132,11 +135,15 @@ import java.util.stream.IntStream;
     consumer.seekToBeginning(assignment);
     long numRecords = 0;
     @SuppressWarnings("unchecked") final List<ConsumerRecord<byte[], byte[]>> actualRecords = new ArrayList();
-    while (numRecords < RECORD_NUMBER) {
-      ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+    short attempts = 0;
+    while (numRecords < RECORD_NUMBER && attempts++ < MAX_ATTEMPTS) {
+      ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1_000));
       actualRecords.addAll(consumerRecords.records(new TopicPartition(TOPIC, 0)));
       numRecords += consumerRecords.count();
     }
+    if(attempts >= MAX_ATTEMPTS) {
+      Assert.fail("Reached max attempts and total number of records is " + numRecords);
+    }
     Assert.assertEquals("Size matters !!", RECORDS.size(), actualRecords.size());
     Iterator<ProducerRecord<byte[], byte[]>> expectedIt = RECORDS.iterator();
     Iterator<ConsumerRecord<byte[], byte[]>> actualIt = actualRecords.iterator();
@@ -152,28 +159,28 @@ import java.util.stream.IntStream;
     HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(3434L, (short) 12);
     secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
-    secondProducer.close();
+    secondProducer.close(Duration.ZERO);
   }
 
   @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpoch() {
     producer.initTransactions();
     producer.beginTransaction();
     long pid = producer.getProducerId();
-    producer.close();
+    producer.close(Duration.ZERO);
     HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(pid, (short) 12);
     secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
-    secondProducer.close();
+    secondProducer.close(Duration.ZERO);
   }
 
   @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongPID() {
     producer.initTransactions();
     producer.beginTransaction();
     short epoch = producer.getEpoch();
-    producer.close();
+    producer.close(Duration.ZERO);
     HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
     secondProducer.resumeTransaction(45L, epoch);
     secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
-    secondProducer.close();
+    secondProducer.close(Duration.ZERO);
   }
 }
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index a79bf4f..287fff4 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -82,11 +82,6 @@ class KafkaBrokerResource extends ExternalResource {
    */
   @Override protected void after() {
     super.after();
-    try {
-      FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
-    } catch (IOException e) {
-      LOG.error("Error cleaning " + tmpLogDir.toString(), e);
-    }
     if (kafkaServer != null) {
       kafkaServer.shutdown();
       kafkaServer.awaitShutdown();
@@ -94,6 +89,11 @@ class KafkaBrokerResource extends ExternalResource {
     if (zkServer != null) {
       zkServer.shutdown();
     }
+    try {
+     FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
+    } catch (IOException e) {
+      LOG.warn("did not clean " + tmpLogDir.toString(), e);
+    }
   }
 
   void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
index 86ed866..7c9ca37 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
@@ -145,7 +145,7 @@ public class TransactionalKafkaWriterTest {
 
   @After public void tearAfterTest() {
     KAFKA_BROKER_RESOURCE.deleteTopic(TOPIC);
-    consumer.close();
+    consumer.close(Duration.ZERO);
     consumer = null;
   }
 
@@ -229,7 +229,7 @@ public class TransactionalKafkaWriterTest {
     long numRecords = 0;
     boolean emptyPoll = false;
     while (numRecords < RECORD_NUMBER && !emptyPoll) {
-      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
+      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
 
       Assert.assertFalse(records.records(new TopicPartition(TOPIC, 0))
           .stream()