You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "fvaleri (via GitHub)" <gi...@apache.org> on 2023/04/06 13:41:49 UTC

[GitHub] [kafka] fvaleri opened a new pull request, #13514: KAFKA-14752: Kafka examples improvements - consumer changes

fvaleri opened a new pull request, #13514:
URL: https://github.com/apache/kafka/pull/13514

   This is extracted from the original PR for better review.
   https://github.com/apache/kafka/pull/13492
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on PR #13514:
URL: https://github.com/apache/kafka/pull/13514#issuecomment-1531090993

   Hi @showuon, I missed that. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1165546954


##########
examples/src/main/java/kafka/examples/Utils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class Utils {
+    private Utils() {
+    }
+
+    public static void printHelp(String message, Object... args) {
+        System.out.println(format(message, args));
+    }
+
+    public static void printOut(String message, Object... args) {
+        System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void printErr(String message, Object... args) {
+        System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void maybePrintRecord(long numRecords, ConsumerRecord<Integer, String> record) {
+        maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset());
+    }
+
+    public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) {
+        maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset());
+    }
+
+    private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) {
+        // we only print 10 records when there are 20 or more to send

Review Comment:
   The current simple "message sampling" logic triggers only when numRecords>=20, so the comment is correct, but I'm open to suggestion for a different sampling strategy to avoid clogging the output.



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -22,96 +22,110 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (Throwable e) {
+                    // add your application retry strategy here
+                    Utils.printErr(e.getMessage());
+                    break;
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Fatal error");
+            e.printStackTrace();
         }
+        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
+        shutdown();
     }
-    public void doWork() {
-        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        messageRemaining -= records.count();
     }
 
-    public void shutdown() {
-        this.consumer.close();
-        latch.countDown();
+    public KafkaConsumer<Integer, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");

Review Comment:
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1176408614


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -76,12 +79,17 @@ public Consumer(String threadName,
     public void run() {
         // the consumer instance is NOT thread safe
         try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            // subscribes to a list of topics to get dynamically assigned partitions
+            // this class implements the rebalance listener that we pass here to be notified of such events
             consumer.subscribe(singleton(topic), this);
             Utils.printOut("Subscribed to %s", topic);
             while (!closed && remainingRecords > 0) {
                 try {
-                    // next poll must be called within session.timeout.ms to avoid rebalance
-                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    // if required, poll updates partition assignment and invokes the configured rebalance listener
+                    // then tries to fetch records sequentially using the last committed offset or auto.offset.reset policy
+                    // returns immediately if there are records or times out returning an empty record set
+                    // the next poll must be called within session.timeout.ms to avoid group rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));

Review Comment:
   Why do we poll with 10 seconds? Doesn't it exceed `session.timeout.ms` value?



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();

Review Comment:
   Is this commit necessary?



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());

Review Comment:
   We should feed the partitions in the exception. That is:
   `consumer.seekToEnd(e.partitions())`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1176408614


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -76,12 +79,17 @@ public Consumer(String threadName,
     public void run() {
         // the consumer instance is NOT thread safe
         try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            // subscribes to a list of topics to get dynamically assigned partitions
+            // this class implements the rebalance listener that we pass here to be notified of such events
             consumer.subscribe(singleton(topic), this);
             Utils.printOut("Subscribed to %s", topic);
             while (!closed && remainingRecords > 0) {
                 try {
-                    // next poll must be called within session.timeout.ms to avoid rebalance
-                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    // if required, poll updates partition assignment and invokes the configured rebalance listener
+                    // then tries to fetch records sequentially using the last committed offset or auto.offset.reset policy
+                    // returns immediately if there are records or times out returning an empty record set
+                    // the next poll must be called within session.timeout.ms to avoid group rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));

Review Comment:
   Why do we poll with 10 seconds now? I think 1 sec should be long enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13514:
URL: https://github.com/apache/kafka/pull/13514#issuecomment-1531149504

   Thanks for the update. I'll merge it after CI build completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1179880945


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();

Review Comment:
   Fair enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1170144013


##########
examples/src/main/java/kafka/examples/Utils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class Utils {
+    private Utils() {
+    }
+
+    public static void printHelp(String message, Object... args) {
+        System.out.println(format(message, args));
+    }
+
+    public static void printOut(String message, Object... args) {
+        System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void printErr(String message, Object... args) {
+        System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void maybePrintRecord(long numRecords, ConsumerRecord<Integer, String> record) {
+        maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset());
+    }
+
+    public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) {
+        maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset());
+    }
+
+    private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) {
+        // we only print 10 records when there are 20 or more to send

Review Comment:
   Ah, yes, my bad, I had to put pen to paper to figure out that this does indeed only start printing 10 once we go higher than 20.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1179881955


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());

Review Comment:
   Yes, I agree it won't have different result in this demo case. But I think we should provide a good example here to let users know, in their client application, they should use `consumer.seekToEnd(e.partitions())` to seek for the affected partitions only, not all of the assigned partitions. Does that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13514:
URL: https://github.com/apache/kafka/pull/13514#issuecomment-1531036253

   @fvaleri , there is checkstyle failure. Could you fix it?
   ```
   Execution failed for task ':examples:checkstyleMain'.
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13514:
URL: https://github.com/apache/kafka/pull/13514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1173486311


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (AuthorizationException | UnsupportedVersionException
+                         | RecordDeserializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (KafkaException e) {
+                    // log the exception and try to continue
+                    // you can add your application retry strategy here
+                    Utils.printErr(e.getMessage());
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
+        shutdown();
     }
-    public void doWork() {
-        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        messageRemaining -= records.count();
     }
 
-    public void shutdown() {
-        this.consumer.close();
-        latch.countDown();
+    public KafkaConsumer<Integer, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");

Review Comment:
   I'm thinking we can add a comment above each config, to explain why we set it and what it means, ex:
   ```
   // bootstrap server config is required for consumer to connect to brokers
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
   // client id is not required, but it's good to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
           props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
   // group consumer id, it is required when we use subscribe(topics) for group management
           props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        
   
   ```



##########
examples/src/main/java/kafka/examples/Utils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class Utils {
+    private Utils() {
+    }
+
+    public static void printHelp(String message, Object... args) {
+        System.out.println(format(message, args));
+    }
+
+    public static void printOut(String message, Object... args) {
+        System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void printErr(String message, Object... args) {
+        System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void maybePrintRecord(long numRecords, ConsumerRecord<Integer, String> record) {
+        maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset());
+    }
+
+    public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) {
+        maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset());
+    }
+
+    private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) {
+        // we only print 10 records when there are 20 or more to send
+        if (key % Math.max(1, numRecords / 10) == 0) {
+            printOut("Sample: record(%d, %s), partition(%s-%d), offset(%d)", key, value, topic, partition, offset);
+        }
+    }
+
+    public static void recreateTopics(String bootstrapServers, int numPartitions, String... topicNames) {
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        try (Admin admin = Admin.create(props)) {
+            // delete topics if present
+            try {
+                admin.deleteTopics(Arrays.asList(topicNames)).all().get();
+            } catch (ExecutionException e) {
+                if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                    throw e;
+                }
+                printErr("Topics deletion error: %s", e.getCause());
+            }
+            printOut("Deleted topics: %s", Arrays.toString(topicNames));
+            // create topics in a retry loop
+            while (true) {
+                // use default RF to avoid NOT_ENOUGH_REPLICAS error with minISR>1

Review Comment:
   nit: adding space for `minISR > 1`



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (AuthorizationException | UnsupportedVersionException
+                         | RecordDeserializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (KafkaException e) {
+                    // log the exception and try to continue
+                    // you can add your application retry strategy here
+                    Utils.printErr(e.getMessage());
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
+        shutdown();
     }
-    public void doWork() {
-        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        messageRemaining -= records.count();
     }
 
-    public void shutdown() {
-        this.consumer.close();
-        latch.countDown();
+    public KafkaConsumer<Integer, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        if (readCommitted) {
+            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return new KafkaConsumer<>(props);
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        System.out.println("Revoking partitions:" + partitions);
+        Utils.printOut("Revoked partitions: %s", partitions);
     }

Review Comment:
   There should be 3 callbacks available, could you add all of them?



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));

Review Comment:
   Thanks for adding a comment above. But I think we should add more because `consumer.poll` is the most important method in consumer application. You can refer to the KafkaConumser javadoc for it. 



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);

Review Comment:
   We can add a comment above to explain what this line is doing, and what `this` mean in the 2nd parameter.



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (AuthorizationException | UnsupportedVersionException
+                         | RecordDeserializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (KafkaException e) {

Review Comment:
   I think we should also `InvalidOffsetException` handling, to reset to latest offset (and some explanation). WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1174433855


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (AuthorizationException | UnsupportedVersionException
+                         | RecordDeserializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (KafkaException e) {
+                    // log the exception and try to continue
+                    // you can add your application retry strategy here
+                    Utils.printErr(e.getMessage());
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
+        shutdown();
     }
-    public void doWork() {
-        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        messageRemaining -= records.count();
     }
 
-    public void shutdown() {
-        this.consumer.close();
-        latch.countDown();
+    public KafkaConsumer<Integer, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        if (readCommitted) {
+            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return new KafkaConsumer<>(props);
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        System.out.println("Revoking partitions:" + partitions);
+        Utils.printOut("Revoked partitions: %s", partitions);
     }

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1174585082


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (AuthorizationException | UnsupportedVersionException
+                         | RecordDeserializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (KafkaException e) {

Review Comment:
   Yeah, I think it is definitely worth doing. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1180003269


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());

Review Comment:
   Yes, that makes sense. Thanks Luke.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1177496559


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -76,12 +79,17 @@ public Consumer(String threadName,
     public void run() {
         // the consumer instance is NOT thread safe
         try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            // subscribes to a list of topics to get dynamically assigned partitions
+            // this class implements the rebalance listener that we pass here to be notified of such events
             consumer.subscribe(singleton(topic), this);
             Utils.printOut("Subscribed to %s", topic);
             while (!closed && remainingRecords > 0) {
                 try {
-                    // next poll must be called within session.timeout.ms to avoid rebalance
-                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    // if required, poll updates partition assignment and invokes the configured rebalance listener
+                    // then tries to fetch records sequentially using the last committed offset or auto.offset.reset policy
+                    // returns immediately if there are records or times out returning an empty record set
+                    // the next poll must be called within session.timeout.ms to avoid group rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));

Review Comment:
   I can revert that, considering that the examples are supposed to be run on localhost and payloads are very small.



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());

Review Comment:
   I think this is correct. The javadoc says: "If no partitions are provided, seek to the final offset for all of the currently assigned partitions."



##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -91,9 +99,13 @@ public void run() {
                     // we can't recover from these exceptions
                     Utils.printErr(e.getMessage());
                     shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();

Review Comment:
   In the exactly-once demo (auto commit disabled), what happens if you seek to the end and in the next cycles there are no transactions to process? I think you will seek again after every consumer restart, until some transaction is processed and its offsets are committed. I know this can't happen in this demo, but could happen in theory, so I think this commit is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1163804088


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -22,96 +22,110 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (Throwable e) {
+                    // add your application retry strategy here
+                    Utils.printErr(e.getMessage());
+                    break;
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Fatal error");
+            e.printStackTrace();
         }
+        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
+        shutdown();
     }
-    public void doWork() {
-        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        messageRemaining -= records.count();
     }
 
-    public void shutdown() {
-        this.consumer.close();
-        latch.countDown();
+    public KafkaConsumer<Integer, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");

Review Comment:
   ```suggestion
           props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, !readCommitted);
   ```



##########
examples/src/main/java/kafka/examples/Utils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class Utils {
+    private Utils() {
+    }
+
+    public static void printHelp(String message, Object... args) {
+        System.out.println(format(message, args));
+    }
+
+    public static void printOut(String message, Object... args) {
+        System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void printErr(String message, Object... args) {
+        System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void maybePrintRecord(long numRecords, ConsumerRecord<Integer, String> record) {
+        maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset());
+    }
+
+    public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) {
+        maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset());
+    }
+
+    private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) {
+        // we only print 10 records when there are 20 or more to send

Review Comment:
   ```suggestion
           // we only print 10 records when there are more than 10 to send
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13514:
URL: https://github.com/apache/kafka/pull/13514#issuecomment-1531392005

   Failed tests are unrelated:
   ```
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary
       Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextConsumerTest.testMultiConsumerDefaultAssignorAndVerifyAssignment()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testRestartReplication()
       Build / JDK 11 and Scala 2.13 / kafka.admin.AddPartitionsTest.testWrongReplicaCount(String).quorum=zk
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-Combined, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org