You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/03/02 00:44:39 UTC

[kafka] branch 2.5 updated: KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new dd9f70e  KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)
dd9f70e is described below

commit dd9f70e3d1734a455effc411ce092f6847aa0778
Author: Nikolay <ni...@apache.org>
AuthorDate: Mon Mar 2 03:31:15 2020 +0300

    KAFKA-9573: Fix VerifiableProducer and VerifiableConsumer to work with older Kafka versions (#8197)
    
    These classes are used by `upgrade_test.py` with old Kafka versions so they can
    only use functionality that exists in all Kafka versions. This change fixes the test
    for Kafka versions older than 0.11.0.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../java/org/apache/kafka/tools/VerifiableConsumer.java   | 13 ++++++++-----
 .../java/org/apache/kafka/tools/VerifiableProducer.java   | 15 +++++++++------
 2 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index b65eb9a..5be76f0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -633,7 +632,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
             brokerHostandPort = res.getString("brokerList");
         } else {
             parser.printHelp();
-            Exit.exit(0);
+            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+            System.exit(0);
         }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
 
@@ -663,16 +663,19 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
         ArgumentParser parser = argParser();
         if (args.length == 0) {
             parser.printHelp();
-            Exit.exit(0);
+            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+            System.exit(0);
         }
         try {
             final VerifiableConsumer consumer = createFromArgs(parser, args);
-            Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
+            // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
+            Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook"));
 
             consumer.run();
         } catch (ArgumentParserException e) {
             parser.handleError(e);
-            Exit.exit(1);
+            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+            System.exit(1);
         }
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index befdfdd..ee863d4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -34,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Exit;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -240,7 +239,8 @@ public class VerifiableProducer implements AutoCloseable {
             producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
         } else {
             parser.printHelp();
-            Exit.exit(0);
+            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+            System.exit(0);
         }
 
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@@ -528,7 +528,8 @@ public class VerifiableProducer implements AutoCloseable {
         ArgumentParser parser = argParser();
         if (args.length == 0) {
             parser.printHelp();
-            Exit.exit(0);
+            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+            System.exit(0);
         }
 
         try {
@@ -537,7 +538,8 @@ public class VerifiableProducer implements AutoCloseable {
             final long startMs = System.currentTimeMillis();
             ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
 
-            Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> {
+            // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                 // Trigger main thread to stop producing messages
                 producer.stopProducing = true;
 
@@ -549,12 +551,13 @@ public class VerifiableProducer implements AutoCloseable {
                 double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
 
                 producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
-            });
+            }, "verifiable-producer-shutdown-hook"));
 
             producer.run(throttler);
         } catch (ArgumentParserException e) {
             parser.handleError(e);
-            Exit.exit(1);
+            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
+            System.exit(1);
         }
     }