You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/03/02 00:44:39 UTC
[kafka] branch 2.5 updated: KAFKA-9573: Fix VerifiableProducer and
VerifiableConsumer to work with older Kafka versions (#8197)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new dd9f70e KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)
dd9f70e is described below
commit dd9f70e3d1734a455effc411ce092f6847aa0778
Author: Nikolay <ni...@apache.org>
AuthorDate: Mon Mar 2 03:31:15 2020 +0300
KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)
These classes are used by `upgrade_test.py` with old Kafka versions so they can
only use functionality that exists in all Kafka versions. This change fixes the test
for Kafka versions older than 0.11.0.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../java/org/apache/kafka/tools/VerifiableConsumer.java | 13 ++++++++-----
.../java/org/apache/kafka/tools/VerifiableProducer.java | 15 +++++++++------
2 files changed, 17 insertions(+), 11 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index b65eb9a..5be76f0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -633,7 +632,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
brokerHostandPort = res.getString("brokerList");
} else {
parser.printHelp();
- Exit.exit(0);
+ // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+ System.exit(0);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
@@ -663,16 +663,19 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
- Exit.exit(0);
+ // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+ System.exit(0);
}
try {
final VerifiableConsumer consumer = createFromArgs(parser, args);
- Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
+ // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
+ Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook"));
consumer.run();
} catch (ArgumentParserException e) {
parser.handleError(e);
- Exit.exit(1);
+ // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+ System.exit(1);
}
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index befdfdd..ee863d4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.io.InputStream;
@@ -240,7 +239,8 @@ public class VerifiableProducer implements AutoCloseable {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
} else {
parser.printHelp();
- Exit.exit(0);
+ // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+ System.exit(0);
}
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@@ -528,7 +528,8 @@ public class VerifiableProducer implements AutoCloseable {
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
- Exit.exit(0);
+ // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+ System.exit(0);
}
try {
@@ -537,7 +538,8 @@ public class VerifiableProducer implements AutoCloseable {
final long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
- Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> {
+ // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
@@ -549,12 +551,13 @@ public class VerifiableProducer implements AutoCloseable {
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
- });
+ }, "verifiable-producer-shutdown-hook"));
producer.run(throttler);
} catch (ArgumentParserException e) {
parser.handleError(e);
- Exit.exit(1);
+ // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+ System.exit(1);
}
}