You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/05/12 02:39:22 UTC

[kafka] branch trunk updated: KAFKA-14752: Kafka examples improvements - demo changes (#13517)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c757af5f7c6 KAFKA-14752: Kafka examples improvements - demo changes (#13517)
c757af5f7c6 is described below

commit c757af5f7c630d532bfee5f6dc45aec603ad8a29
Author: Federico Valeri <fe...@gmail.com>
AuthorDate: Fri May 12 04:39:12 2023 +0200

    KAFKA-14752: Kafka examples improvements - demo changes (#13517)
    
    KAFKA-14752: Kafka examples improvements - demo changes
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 build.gradle                                       |   1 -
 checkstyle/import-control-core.xml                 |   1 -
 examples/README                                    |  12 --
 examples/README.md                                 |   9 +
 .../kafka/examples/KafkaConsumerProducerDemo.java  |  66 +++++--
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  | 220 ++++++---------------
 6 files changed, 121 insertions(+), 188 deletions(-)

diff --git a/build.gradle b/build.gradle
index 63c110aefc9..75ae5faa041 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1294,7 +1294,6 @@ project(':examples') {
 
   dependencies {
     implementation project(':clients')
-    implementation project(':server-common')
   }
 
   javadoc {
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index a08563a3b5a..f97dda31461 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -71,7 +71,6 @@
 
   <subpackage name="examples">
     <allow pkg="org.apache.kafka.clients" />
-    <allow pkg="org.apache.kafka.server.util" />
   </subpackage>
 
   <subpackage name="log.remote">
diff --git a/examples/README b/examples/README
deleted file mode 100644
index dc5b3947cd1..00000000000
--- a/examples/README
+++ /dev/null
@@ -1,12 +0,0 @@
-This directory contains examples of client code that uses kafka.
-
-To run the demo:
-
-   1. In Zookeeper mode, Start Zookeeper and the Kafka server. In KRaft mode, start the Kafka server.
-   2. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
-   3. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
-   4. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`,
-      this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records.
-   5. Some notes for exactly once demo:
-      5.1. The Kafka server has to be on broker version 2.5 or higher.
-      5.2. You could also use IntelliJ IDEA to run the example directly by configuring parameters as "Program arguments"
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 00000000000..6451982f0a5
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,9 @@
+# Kafka client examples
+
+This module contains some Kafka client examples.
+
+1. Start a Kafka 2.5+ local cluster with a plain listener configured on port 9092.
+2. Run `examples/bin/java-producer-consumer-demo.sh 10000` to asynchronously send 10k records to topic1 and consume them.
+3. Run `examples/bin/java-producer-consumer-demo.sh 10000 sync` to synchronous send 10k records to topic1 and consume them.
+4. Run `examples/bin/exactly-once-demo.sh 6 3 10000` to create input-topic and output-topic with 6 partitions each,
+   start 3 transactional application instances and process 10k records.
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index ff44efe492e..3c6424c7dca 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,29 +16,59 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.common.errors.TimeoutException;
-
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
+ * record all the log output together.
+ */
 public class KafkaConsumerProducerDemo {
-    public static void main(String[] args) throws InterruptedException {
-        boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
-        CountDownLatch latch = new CountDownLatch(2);
-        Producer producerThread = new Producer(
-            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
-        producerThread.start();
-
-        Consumer consumerThread = new Consumer(
-            "consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
-        consumerThread.start();
-
-        if (!latch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish");
-        }
+    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    public static final String TOPIC_NAME = "my-topic";
+    public static final String GROUP_NAME = "my-group";
+
+    public static void main(String[] args) {
+        try {
+            if (args.length == 0) {
+                Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" +
+                    "- records: total number of records to send (required)%n" +
+                    "- mode: pass 'sync' to send records synchronously (optional)");
+                return;
+            }
 
-        consumerThread.shutdown();
-        System.out.println("All finished!");
+            int numRecords = Integer.parseInt(args[0]);
+            boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync");
+
+            // stage 1: clean any topics left from previous runs
+            Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+            CountDownLatch latch = new CountDownLatch(2);
+
+            // stage 2: produce records to topic1
+            Producer producerThread = new Producer(
+                "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
+            producerThread.start();
+
+            // stage 3: consume records from topic1
+            Consumer consumerThread = new Consumer(
+                "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
+            consumerThread.start();
+
+            if (!latch.await(5, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 5 minutes waiting for termination");
+                producerThread.shutdown();
+                consumerThread.shutdown();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
     }
 }
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 1a2cfcb8a24..9d94337491d 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -16,185 +16,93 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
- *    the input topic. The driver will block for the record generation to finish, so the producer
- *    must be in synchronous sending mode.
+ * This example can be decomposed into the following stages:
  *
- * 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
- *    drain all the records from either given partitions or auto assigned partitions by actively
- *    comparing log end offset with committed offset. Each record will be processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will block for all the record
- *    processing to finish. The transformed record shall be written to the output topic, with
- *    transactional guarantee.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end offset with committed offset.
+ *    Each record will be processed exactly-once with strong partition level ordering guarantee.
+ *    The demo will block until all records are processed and written to the output topic.
+ * 4. Create a read_committed consumer thread to verify we have all records in the output topic,
+ *    and record ordering at the partition level is maintained.
+ *    The demo will block for the consumption of all committed records, with transactional guarantee.
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have all records within
- *    the output topic, while the message ordering on partition level is maintained.
- *    The driver will block for the consumption of all committed records.
- *
- * From this demo, you could see that all the records from pre-population are processed exactly once,
- * with strong partition level ordering guarantee.
- *
- * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
- * in order to run, otherwise the app could throw
+ * Broker version must be >= 2.5.0 in order to run, otherwise the example will throw
  * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
+ * record all the log output together.
  */
 public class KafkaExactlyOnceDemo {
     public static final String BOOTSTRAP_SERVERS = "localhost:9092";
     private static final String INPUT_TOPIC = "input-topic";
     private static final String OUTPUT_TOPIC = "output-topic";
+    public static final String GROUP_NAME = "check-group";
 
-    public static void main(String[] args) throws InterruptedException, ExecutionException {
-        if (args.length != 3) {
-            throw new IllegalArgumentException("Should accept 3 parameters: " +
-                "[number of partitions], [number of instances], [number of records]");
-        }
-
-        int numPartitions = Integer.parseInt(args[0]);
-        int numInstances = Integer.parseInt(args[1]);
-        int numRecords = Integer.parseInt(args[2]);
-
-        /* Stage 1: topic cleanup and recreation */
-        recreateTopics(numPartitions);
-
-        CountDownLatch prePopulateLatch = new CountDownLatch(1);
-
-        /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(
-            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
-        producerThread.start();
-
-        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
-        }
-
-        CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
-
-        /* Stage 3: transactionally process all messages */
-        CountDownLatch processorsLatch = new CountDownLatch(numInstances);
-        List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
-            .mapToObj(id -> new ExactlyOnceMessageProcessor(
-                "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
-            .collect(Collectors.toList());
-        processors.forEach(ExactlyOnceMessageProcessor::start);
-
-        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
-        }
-
-        CountDownLatch consumeLatch = new CountDownLatch(1);
-
-        /* Stage 4: consume all processed messages to verify exactly once */
-        Consumer consumerThread = new Consumer(
-            "consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
-        consumerThread.start();
-
-        if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
-        }
-
-        consumerThread.shutdown();
-        System.out.println("All finished!");
-    }
-
-    private static void recreateTopics(final int numPartitions)
-        throws ExecutionException, InterruptedException {
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-            KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-
-        Admin adminClient = Admin.create(props);
-
-        List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
-
-        deleteTopic(adminClient, topicsToDelete);
-
-        // Check topic existence in a retry loop
-        while (true) {
-            System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);
-
-            Set<String> listedTopics = adminClient.listTopics().names().get();
-            System.out.println("Current list of topics: " + listedTopics);
-
-            boolean hasTopicInfo = false;
-            for (String listedTopic : listedTopics) {
-                if (topicsToDelete.contains(listedTopic)) {
-                    hasTopicInfo = true;
-                    break;
-                }
-            }
-            if (!hasTopicInfo) {
-                break;
+    public static void main(String[] args) {
+        try {
+            if (args.length != 3) {
+                Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n" +
+                    "- partition: number of partitions for input and output topics (required)%n" +
+                    "- instances: number of application instances (required)%n" +
+                    "- records: total number of records (required)");
+                return;
             }
-            Thread.sleep(1000);
-        }
 
-        // Create topics in a retry loop
-        while (true) {
-            final short replicationFactor = 1;
-            final List<NewTopic> newTopics = Arrays.asList(
-                new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
-                new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
-            try {
-                adminClient.createTopics(newTopics).all().get();
-                System.out.println("Created new topics: " + newTopics);
-                break;
-            } catch (ExecutionException e) {
-                if (!(e.getCause() instanceof TopicExistsException)) {
-                    throw e;
-                }
-                System.out.println("Metadata of the old topics are not cleared yet...");
-
-                deleteTopic(adminClient, topicsToDelete);
+            int numPartitions = Integer.parseInt(args[0]);
+            int numInstances = Integer.parseInt(args[1]);
+            int numRecords = Integer.parseInt(args[2]);
+
+            // stage 1: clean any topics left from previous runs
+            Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);
+
+            // stage 2: send demo records to the input-topic
+            CountDownLatch producerLatch = new CountDownLatch(1);
+            Producer producerThread = new Producer(
+                "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
+            producerThread.start();
+            if (!producerLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for data load");
+                producerThread.shutdown();
+                return;
+            }
 
-                Thread.sleep(1000);
+            // stage 3: read from input-topic, process once and write to the output-topic
+            CountDownLatch processorsLatch = new CountDownLatch(numInstances);
+            List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
+                .mapToObj(id -> new ExactlyOnceMessageProcessor(
+                    "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
+                .collect(Collectors.toList());
+            processors.forEach(ExactlyOnceMessageProcessor::start);
+            if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for record copy");
+                processors.forEach(ExactlyOnceMessageProcessor::shutdown);
+                return;
             }
-        }
-    }
 
-    private static void deleteTopic(final Admin adminClient, final List<String> topicsToDelete)
-        throws InterruptedException, ExecutionException {
-        try {
-            adminClient.deleteTopics(topicsToDelete).all().get();
-        } catch (ExecutionException e) {
-            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
-                throw e;
+            // stage 4: check consuming records from the output-topic
+            CountDownLatch consumerLatch = new CountDownLatch(1);
+            Consumer consumerThread = new Consumer(
+                "consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch);
+            consumerThread.start();
+            if (!consumerLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for output read");
+                consumerThread.shutdown();
             }
-            System.out.println("Encountered exception during topic deletion: " + e.getCause());
+        } catch (Throwable e) {
+            e.printStackTrace();
         }
-        System.out.println("Deleted old topics: " + topicsToDelete);
     }
 }