You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "clolov (via GitHub)" <gi...@apache.org> on 2023/04/12 08:35:23 UTC

[GitHub] [kafka] clolov commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

clolov commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1163804088


##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -22,96 +22,110 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records and prints them.
+ * The thread does not stop until all records are completed or an exception is raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-    private final KafkaConsumer<Integer, String> consumer;
+    private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final int numMessageToConsume;
-    private int messageRemaining;
+    private final Optional<String> instanceId;
+    private final boolean readCommitted;
+    private final int numRecords;
     private final CountDownLatch latch;
+    private volatile boolean closed;
+    private int remainingRecords;
 
-    public Consumer(final String topic,
-                    final String groupId,
-                    final Optional<String> instanceId,
-                    final boolean readCommitted,
-                    final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample");
-        this.groupId = groupId;
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        if (readCommitted) {
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        }
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        consumer = new KafkaConsumer<>(props);
+    public Consumer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    String groupId,
+                    Optional<String> instanceId,
+                    boolean readCommitted,
+                    int numRecords,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
-        this.numMessageToConsume = numMessageToConsume;
-        this.messageRemaining = numMessageToConsume;
+        this.groupId = groupId;
+        this.instanceId = instanceId;
+        this.readCommitted = readCommitted;
+        this.numRecords = numRecords;
+        this.remainingRecords = numRecords;
         this.latch = latch;
     }
 
-    KafkaConsumer<Integer, String> get() {
-        return consumer;
-    }
-
     @Override
     public void run() {
-        try {
-            System.out.println("Subscribe to:" + this.topic);
-            consumer.subscribe(Collections.singletonList(this.topic), this);
-            do {
-                doWork();
-            } while (messageRemaining > 0);
-            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
-        } catch (WakeupException e) {
-            // swallow the wakeup
-        } catch (Exception e) {
-            System.out.println("Unexpected termination, exception thrown:" + e);
-        } finally {
-            shutdown();
+        // the consumer instance is NOT thread safe
+        try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
+            consumer.subscribe(singleton(topic), this);
+            Utils.printOut("Subscribed to %s", topic);
+            while (!closed && remainingRecords > 0) {
+                try {
+                    // next poll must be called within session.timeout.ms to avoid rebalance
+                    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        Utils.maybePrintRecord(numRecords, record);
+                    }
+                    remainingRecords -= records.count();
+                } catch (Throwable e) {
+                    // add your application retry strategy here
+                    Utils.printErr(e.getMessage());
+                    break;
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Fatal error");
+            e.printStackTrace();
         }
+        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
+        shutdown();
     }
-    public void doWork() {
-        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
-        for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        messageRemaining -= records.count();
     }
 
-    public void shutdown() {
-        this.consumer.close();
-        latch.countDown();
+    public KafkaConsumer<Integer, String> createKafkaConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");

Review Comment:
   ```suggestion
           props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, !readCommitted);
   ```



##########
examples/src/main/java/kafka/examples/Utils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class Utils {
+    private Utils() {
+    }
+
+    public static void printHelp(String message, Object... args) {
+        System.out.println(format(message, args));
+    }
+
+    public static void printOut(String message, Object... args) {
+        System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void printErr(String message, Object... args) {
+        System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
+    }
+
+    public static void maybePrintRecord(long numRecords, ConsumerRecord<Integer, String> record) {
+        maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset());
+    }
+
+    public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) {
+        maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset());
+    }
+
+    private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) {
+        // we only print 10 records when there are 20 or more to send

Review Comment:
   ```suggestion
           // we only print 10 records when there are more than 10 to send
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org