You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/08/21 17:56:49 UTC
[hive] branch master updated: HIVE-22125: Update Kafka clients to
2.3 (Slim B via Ashutosh Chauchan)
This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1cf38b6 HIVE-22125: Update Kafka clients to 2.3 (Slim B via Ashutosh Chauchan)
1cf38b6 is described below
commit 1cf38b648e9517cbd84ee7faada72996bc757e56
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Wed Aug 21 10:56:40 2019 -0700
HIVE-22125: Update Kafka clients to 2.3 (Slim B via Ashutosh Chauchan)
---
kafka-handler/pom.xml | 2 +-
.../hadoop/hive/kafka/HiveKafkaProducer.java | 9 ++-----
.../hadoop/hive/kafka/KafkaStorageHandler.java | 2 +-
.../hive/kafka/TransactionalKafkaWriter.java | 6 +++--
.../hadoop/hive/kafka/HiveKafkaProducerTest.java | 29 ++++++++++++++--------
.../hadoop/hive/kafka/KafkaBrokerResource.java | 10 ++++----
.../hive/kafka/TransactionalKafkaWriterTest.java | 4 +--
7 files changed, 33 insertions(+), 29 deletions(-)
diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index 647b6a6..02f5a27 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -30,7 +30,7 @@
<properties>
<hive.path.to.root>..</hive.path.to.root>
- <kafka.version>2.2.0</kafka.version>
+ <kafka.version>2.3.0</kafka.version>
</properties>
<artifactId>kafka-handler</artifactId>
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index ba27233..9f85681 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -45,7 +45,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
-
/**
* Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
* This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
@@ -133,15 +132,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
- Object nextSequence = getValue(transactionManager, "nextSequence");
- Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence");
-
+ Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
invoke(transactionManager,
"transitionTo",
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
- invoke(nextSequence, "clear");
- invoke(lastAckedSequence, "clear");
-
+ invoke(topicPartitionBookkeeper, "reset");
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
setValue(producerIdAndEpoch, "producerId", producerId);
setValue(producerIdAndEpoch, "epoch", epoch);
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index d87f245..c3ddbb5 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -302,7 +302,7 @@ import java.util.function.Predicate;
RetryUtils.retry(buildProducersTask, isRetrayable, cleanUpTheMap, maxTries, "Error while Builing Producers");
} catch (Exception e) {
// Can not go further
- LOG.error("Can not fetch build produces due [{}]", e.getMessage());
+ LOG.error("Can not fetch build produces due [{}]", e);
throw new MetaException(e.getMessage());
}
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
index fb4d034..e4015fc 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -63,6 +64,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class);
private static final String TRANSACTION_DIR = "transaction_states";
+ private static final Duration DURATION_0 = Duration.ofMillis(0);
private final String topic;
private final HiveKafkaProducer<byte[], byte[]> producer;
@@ -178,7 +180,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
} catch (Exception e) {
LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage());
}
- producer.close(0, TimeUnit.MILLISECONDS);
+ producer.close(DURATION_0);
return;
}
@@ -209,11 +211,11 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
persistTxState();
}
checkExceptions();
- producer.close();
LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]",
producer.getTransactionalId(),
sentRecords,
topic);
+ producer.close(Duration.ZERO);
}
private void commitTransaction() {
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
index db2515c..8c9ed5f 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -56,16 +57,17 @@ import java.util.stream.IntStream;
private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducerTest.class);
private static final int RECORD_NUMBER = 17384;
- private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+ private static final byte[] KEY_BYTES = "KEY".getBytes(StandardCharsets.UTF_8);
private static final KafkaBrokerResource KAFKA_BROKER_RESOURCE = new KafkaBrokerResource();
private static final String TOPIC = "test-tx-producer";
private static final List<ProducerRecord<byte[], byte[]>>
RECORDS =
IntStream.range(0, RECORD_NUMBER).mapToObj(number -> {
- final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8"));
+ final byte[] value = ("VALUE-" + number).getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>(TOPIC, value, KEY_BYTES);
}).collect(Collectors.toList());
+ private static final short MAX_ATTEMPTS = 500;
@BeforeClass public static void setupCluster() throws Throwable {
KAFKA_BROKER_RESOURCE.before();
@@ -109,7 +111,8 @@ import java.util.stream.IntStream;
consumer = null;
}
- @Test public void resumeTransaction() {
+
+ @Test(timeout = 120_000) public void resumeTransaction() {
producer.initTransactions();
producer.beginTransaction();
long pid = producer.getProducerId();
@@ -119,7 +122,7 @@ import java.util.stream.IntStream;
//noinspection unchecked
RECORDS.forEach(producer::send);
producer.flush();
- producer.close();
+ producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(pid, epoch);
@@ -132,11 +135,15 @@ import java.util.stream.IntStream;
consumer.seekToBeginning(assignment);
long numRecords = 0;
@SuppressWarnings("unchecked") final List<ConsumerRecord<byte[], byte[]>> actualRecords = new ArrayList();
- while (numRecords < RECORD_NUMBER) {
- ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+ short attempts = 0;
+ while (numRecords < RECORD_NUMBER && attempts++ < MAX_ATTEMPTS) {
+ ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1_000));
actualRecords.addAll(consumerRecords.records(new TopicPartition(TOPIC, 0)));
numRecords += consumerRecords.count();
}
+ if(attempts >= MAX_ATTEMPTS) {
+ Assert.fail("Reached max attempts and total number of records is " + numRecords);
+ }
Assert.assertEquals("Size matters !!", RECORDS.size(), actualRecords.size());
Iterator<ProducerRecord<byte[], byte[]>> expectedIt = RECORDS.iterator();
Iterator<ConsumerRecord<byte[], byte[]>> actualIt = actualRecords.iterator();
@@ -152,28 +159,28 @@ import java.util.stream.IntStream;
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(3434L, (short) 12);
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
- secondProducer.close();
+ secondProducer.close(Duration.ZERO);
}
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpoch() {
producer.initTransactions();
producer.beginTransaction();
long pid = producer.getProducerId();
- producer.close();
+ producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(pid, (short) 12);
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
- secondProducer.close();
+ secondProducer.close(Duration.ZERO);
}
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongPID() {
producer.initTransactions();
producer.beginTransaction();
short epoch = producer.getEpoch();
- producer.close();
+ producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(45L, epoch);
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
- secondProducer.close();
+ secondProducer.close(Duration.ZERO);
}
}
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index a79bf4f..287fff4 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -82,11 +82,6 @@ class KafkaBrokerResource extends ExternalResource {
*/
@Override protected void after() {
super.after();
- try {
- FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
- } catch (IOException e) {
- LOG.error("Error cleaning " + tmpLogDir.toString(), e);
- }
if (kafkaServer != null) {
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
@@ -94,6 +89,11 @@ class KafkaBrokerResource extends ExternalResource {
if (zkServer != null) {
zkServer.shutdown();
}
+ try {
+ FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
+ } catch (IOException e) {
+ LOG.warn("did not clean " + tmpLogDir.toString(), e);
+ }
}
void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
index 86ed866..7c9ca37 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
@@ -145,7 +145,7 @@ public class TransactionalKafkaWriterTest {
@After public void tearAfterTest() {
KAFKA_BROKER_RESOURCE.deleteTopic(TOPIC);
- consumer.close();
+ consumer.close(Duration.ZERO);
consumer = null;
}
@@ -229,7 +229,7 @@ public class TransactionalKafkaWriterTest {
long numRecords = 0;
boolean emptyPoll = false;
while (numRecords < RECORD_NUMBER && !emptyPoll) {
- ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
Assert.assertFalse(records.records(new TopicPartition(TOPIC, 0))
.stream()