You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "fvaleri (via GitHub)" <gi...@apache.org> on 2023/05/11 16:32:55 UTC

[GitHub] [kafka] fvaleri commented on a diff in pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

fvaleri commented on code in PR #13517:
URL: https://github.com/apache/kafka/pull/13517#discussion_r1191435518


##########
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java:
##########
@@ -16,29 +16,59 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.common.errors.TimeoutException;
-
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
+ * record all the log output together.
+ */
 public class KafkaConsumerProducerDemo {
-    public static void main(String[] args) throws InterruptedException {
-        boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
-        CountDownLatch latch = new CountDownLatch(2);
-        Producer producerThread = new Producer(
-            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
-        producerThread.start();
-
-        Consumer consumerThread = new Consumer(
-            "consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
-        consumerThread.start();
-
-        if (!latch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish");
-        }
+    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    public static final String TOPIC_NAME = "my-topic";
+    public static final String GROUP_NAME = "my-group";
+
+    public static void main(String[] args) {
+        try {
+            if (args.length == 0) {
+                Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" +
+                    "- records: total number of records to send (required)%n" +
+                    "- mode: pass 'sync' to send records synchronously (optional)");
+                return;
+            }
 
-        consumerThread.shutdown();
-        System.out.println("All finished!");
+            int numRecords = Integer.parseInt(args[0]);
+            boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync");
+
+            // stage 1: clean any topics left from previous runs
+            Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+            CountDownLatch latch = new CountDownLatch(2);
+
+            // stage 2: produce records to topic1
+            Producer producerThread = new Producer(
+                "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
+            producerThread.start();
+
+            // stage 3: consume records from topic1
+            Consumer consumerThread = new Consumer(
+                "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
+            consumerThread.start();
+
+            if (!latch.await(5, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 5 minutes waiting for termination");
+                //producerThread.shutdown();
+                consumerThread.shutdown();

Review Comment:
   Rebase issue, fixed.



##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
- *    the input topic. The driver will block for the record generation to finish, so the producer
- *    must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
- *    drain all the records from either given partitions or auto assigned partitions by actively
- *    comparing log end offset with committed offset. Each record will be processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will block for all the record
- *    processing to finish. The transformed record shall be written to the output topic, with
- *    transactional guarantee.
+ * This example can be decomposed into the following stages:
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have all records within
- *    the output topic, while the message ordering on partition level is maintained.
- *    The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end offset with committed offset.
+ *    Each record will be processed exactly once, dividing the key by 2 and extending the value record.
+ *    The demo will block until all records are processed and written to the output topic.

Review Comment:
   Yep.



##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
- *    the input topic. The driver will block for the record generation to finish, so the producer
- *    must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
- *    drain all the records from either given partitions or auto assigned partitions by actively
- *    comparing log end offset with committed offset. Each record will be processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will block for all the record
- *    processing to finish. The transformed record shall be written to the output topic, with
- *    transactional guarantee.
+ * This example can be decomposed into the following stages:
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have all records within
- *    the output topic, while the message ordering on partition level is maintained.
- *    The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end offset with committed offset.
+ *    Each record will be processed exactly once, dividing the key by 2 and extending the value record.
+ *    The demo will block until all records are processed and written to the output topic.
+ * 4. Create a read_committed consumer  thread to verify we have all records in the output topic,
+ *    and record ordering at the partition level is maintained.
+ *    The demo will block for the consumption of all committed records.
  *
- * From this demo, you could see that all the records from pre-population are processed exactly once,
- * with strong partition level ordering guarantee.

Review Comment:
   Yep. Added in line 35.



##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
- *    the input topic. The driver will block for the record generation to finish, so the producer
- *    must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
- *    drain all the records from either given partitions or auto assigned partitions by actively
- *    comparing log end offset with committed offset. Each record will be processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will block for all the record
- *    processing to finish. The transformed record shall be written to the output topic, with
- *    transactional guarantee.
+ * This example can be decomposed into the following stages:
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have all records within
- *    the output topic, while the message ordering on partition level is maintained.
- *    The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end offset with committed offset.
+ *    Each record will be processed exactly once, dividing the key by 2 and extending the value record.
+ *    The demo will block until all records are processed and written to the output topic.
+ * 4. Create a read_committed consumer  thread to verify we have all records in the output topic,
+ *    and record ordering at the partition level is maintained.
+ *    The demo will block for the consumption of all committed records.
  *
- * From this demo, you could see that all the records from pre-population are processed exactly once,
- * with strong partition level ordering guarantee.
- *
- * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
- * in order to run, otherwise the app could throw
+ * Broker version must be >= 2.5.0 in order to run, otherwise the example will throw
  * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
+ * record all the log output together.
  */
 public class KafkaExactlyOnceDemo {
-
+    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
     private static final String INPUT_TOPIC = "input-topic";
     private static final String OUTPUT_TOPIC = "output-topic";
+    public static final String GROUP_NAME = "check-group";
 
-    public static void main(String[] args) throws InterruptedException, ExecutionException {
-        if (args.length != 3) {
-            throw new IllegalArgumentException("Should accept 3 parameters: " +
-                "[number of partitions], [number of instances], [number of records]");
-        }
-
-        int numPartitions = Integer.parseInt(args[0]);
-        int numInstances = Integer.parseInt(args[1]);
-        int numRecords = Integer.parseInt(args[2]);
-
-        /* Stage 1: topic cleanup and recreation */
-        recreateTopics(numPartitions);
-
-        CountDownLatch prePopulateLatch = new CountDownLatch(1);
-
-        /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(
-            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
-        producerThread.start();
-
-        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
-        }
-
-        CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
-
-        /* Stage 3: transactionally process all messages */
-        for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
-            ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(
-                INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch);
-            messageProcessor.start();
-        }
-
-        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
-        }
-
-        CountDownLatch consumeLatch = new CountDownLatch(1);
-
-        /* Stage 4: consume all processed messages to verify exactly once */
-        Consumer consumerThread = new Consumer(
-            "consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
-        consumerThread.start();
-
-        if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
-        }
-
-        consumerThread.shutdown();
-        System.out.println("All finished!");
-    }
-
-    private static void recreateTopics(final int numPartitions)
-        throws ExecutionException, InterruptedException {
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-            KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-
-        Admin adminClient = Admin.create(props);
-
-        List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
-
-        deleteTopic(adminClient, topicsToDelete);
-
-        // Check topic existence in a retry loop
-        while (true) {
-            System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);
-
-            Set<String> listedTopics = adminClient.listTopics().names().get();
-            System.out.println("Current list of topics: " + listedTopics);
-
-            boolean hasTopicInfo = false;
-            for (String listedTopic : listedTopics) {
-                if (topicsToDelete.contains(listedTopic)) {
-                    hasTopicInfo = true;
-                    break;
-                }
-            }
-            if (!hasTopicInfo) {
-                break;
+    public static void main(String[] args) {
+        try {
+            if (args.length != 3) {
+                Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n" +
+                    "- partition: number of partitions for input and output topics (required)%n" +
+                    "- instances: number of application instances (required)%n" +
+                    "- records: total number of records (required)");
+                return;
             }
-            Thread.sleep(1000);
-        }
-
-        // Create topics in a retry loop
-        while (true) {
-            final short replicationFactor = 1;
-            final List<NewTopic> newTopics = Arrays.asList(
-                new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
-                new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
-            try {
-                adminClient.createTopics(newTopics).all().get();
-                System.out.println("Created new topics: " + newTopics);
-                break;
-            } catch (ExecutionException e) {
-                if (!(e.getCause() instanceof TopicExistsException)) {
-                    throw e;
-                }
-                System.out.println("Metadata of the old topics are not cleared yet...");
 
-                deleteTopic(adminClient, topicsToDelete);
+            int numPartitions = Integer.parseInt(args[0]);
+            int numInstances = Integer.parseInt(args[1]);
+            int numRecords = Integer.parseInt(args[2]);
+
+            // stage 1: clean any topics left from previous runs
+            Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);
+
+            // stage 2: send demo records to the input-topic
+            CountDownLatch producerLatch = new CountDownLatch(1);
+            Producer producerThread = new Producer(
+                "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
+            producerThread.start();
+            if (!producerLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for data load");
+                producerThread.shutdown();
+                return;
+            }
 
-                Thread.sleep(1000);
+            // stage 3: read from input-topic, process once and write to the output-topic
+            CountDownLatch processorsLatch = new CountDownLatch(numInstances);
+            for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
+                ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(
+                    INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, processorsLatch);
+                messageProcessor.start();
+            }
+            if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for record copy");
+                //processors.forEach(ExactlyOnceMessageProcessor::shutdown);

Review Comment:
   There was no shutdown before the new processor merge. Added now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org