You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "fvaleri (via GitHub)" <gi...@apache.org> on 2023/04/06 13:44:39 UTC

[GitHub] [kafka] fvaleri opened a new pull request, #13516: KAFKA-14752: Kafka examples improvements - processor changes

fvaleri opened a new pull request, #13516:
URL: https://github.com/apache/kafka/pull/13516

   This is extracted from the original PR for better review. https://github.com/apache/kafka/pull/13492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   I did some tests to set `max.poll.records` to 10. I tried to abort the transaction randomly, but the consumer didn't poll aborted messages automatically; I guess this is not match exactly once semantic; maybe we also need to seek topics to LSO manually while aborting transactions, WDYT?
   
   Test code snippet
   ```
   UniformRandomProvider rng = RandomSource.XO_RO_SHI_RO_128_PP.create();
   while (!closed && remainingRecords > 0) {
       ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
   
       if (!records.isEmpty()) {
           // begin a new transaction session
           producer.beginTransaction();
   
           for (ConsumerRecord<Integer, String> record : records) {
               // process the record and send downstream
               ProducerRecord<Integer, String> newRecord =
                   new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
               producer.send(newRecord);
           }
   
           // checkpoint the progress by sending offsets to group coordinator broker
           // note that this API is only available for broker >= 2.5
           producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
   
           // commit the transaction including offsets
           if (rng.nextInt(10) < 5) {
               producer.abortTransaction();
           } else {
               producer.commitTransaction();
           }
           processedRecords += records.count();
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1190880657


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(
+                "processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+                    .createKafkaConsumer();
+
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
-
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
-
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
-
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {

Review Comment:
   Good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13516:
URL: https://github.com/apache/kafka/pull/13516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188380635


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(
+                "processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+                    .createKafkaConsumer();
+
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
-
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
-
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
-
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    Utils.printErr(e.getMessage());
+                    // we can't recover from these exceptions
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
                 }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
             }
-
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }

Review Comment:
   Please also add `onPartitionsLost`



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(
+                "processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+                    .createKafkaConsumer();
+
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
-
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
-
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
-
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {

Review Comment:
   Might need to be consistent with the producer change (i.e. check instanceOf retriableException)



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();

Review Comment:
   This might need to update?



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(

Review Comment:
   When do we close this consumer and producer?



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -20,56 +20,63 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
 
 /**
- * A demo class for how to write a customized EOS app. It takes a consume-process-produce loop.
- * Important configurations and APIs are commented.
+ * This class implements a read-process-write application.
  */
-public class ExactlyOnceMessageProcessor extends Thread {
-
-    private static final boolean READ_COMMITTED = true;
-
+public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener {
+    private final String bootstrapServers;
     private final String inputTopic;
     private final String outputTopic;
-    private final String transactionalId;
     private final String groupInstanceId;
+    private final CountDownLatch latch;
+    private final String transactionalId;
+    private volatile boolean closed;
 
     private final KafkaProducer<Integer, String> producer;
     private final KafkaConsumer<Integer, String> consumer;
 
-    private final CountDownLatch latch;
-
-    public ExactlyOnceMessageProcessor(final String inputTopic,
-                                       final String outputTopic,
-                                       final int instanceIdx,
-                                       final CountDownLatch latch) {
+    public ExactlyOnceMessageProcessor(String threadName,
+                                       String bootstrapServers,
+                                       String inputTopic,
+                                       String outputTopic,
+                                       CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.inputTopic = inputTopic;
         this.outputTopic = outputTopic;
-        this.transactionalId = "Processor-" + instanceIdx;
+        this.transactionalId = threadName;
         // It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
         final int transactionTimeoutMs = 10000;
         // A unique transactional.id must be provided in order to properly use EOS.
         producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
         // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
         // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
-        this.groupInstanceId = "Txn-consumer-" + instanceIdx;
+        this.groupInstanceId = threadName;

Review Comment:
   I think it makes sense to keep different name for group instance id and transactional id. Could we set different value for them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   Sorry, why need to remove this? We don't need to reset the offset manually, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   Sorry, I have a question. Why remove this? We don't need to reset the offset manually, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1531455051


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   > In a real world application you could send failed records to a DLT (dead letter topic) for further processing.
   
   Yes, in commons, we can do some retries (such as five times); if they always fail to process, we can send them to DLT.
   
   > Here we can simply log them and/or add a comment.
   
   If we can add a comment, it's better. Otherwise, users may be confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609885


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(
+                "processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+                    .createKafkaConsumer();
+
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
-
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
-
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
-
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {

Review Comment:
   Well, this is consistent with the consumer. The exception list here is shorter and much more compact (no repeated instanceof). If you don't mind, I would keep it as it is.



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(

Review Comment:
   Thanks for catching this, I think I've lost that while doing refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1531939832


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   Opened https://github.com/apache/kafka/pull/15561, let's see what others think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   Sorry, I have a question. Why need to remove this? We don't need to reset the offset manually, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528843407


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   > We don't need to reset the offset manually, right?
   
   This is my understanding. Here we are subscribing, and the Consumer Javadoc says:
   
   "On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13516:
URL: https://github.com/apache/kafka/pull/13516#issuecomment-1543640638

   Failed tests are unrelated:
   ```
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testMultipleServerMechanisms()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testRestartReplication()
       Build / JDK 17 and Scala 2.13 / kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
       Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-Combined, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609481


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
+
+            // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+            boolean readCommitted = true;
+            KafkaConsumer<Integer, String> consumer = new Consumer(
+                "processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+                    .createKafkaConsumer();
+
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
-
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
-
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
-
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    Utils.printErr(e.getMessage());
+                    // we can't recover from these exceptions
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
                 }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
             }
-
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }

Review Comment:
   Yep.



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -20,56 +20,63 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
 
 /**
- * A demo class for how to write a customized EOS app. It takes a consume-process-produce loop.
- * Important configurations and APIs are commented.
+ * This class implements a read-process-write application.
  */
-public class ExactlyOnceMessageProcessor extends Thread {
-
-    private static final boolean READ_COMMITTED = true;
-
+public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener {
+    private final String bootstrapServers;
     private final String inputTopic;
     private final String outputTopic;
-    private final String transactionalId;
     private final String groupInstanceId;
+    private final CountDownLatch latch;
+    private final String transactionalId;
+    private volatile boolean closed;
 
     private final KafkaProducer<Integer, String> producer;
     private final KafkaConsumer<Integer, String> consumer;
 
-    private final CountDownLatch latch;
-
-    public ExactlyOnceMessageProcessor(final String inputTopic,
-                                       final String outputTopic,
-                                       final int instanceIdx,
-                                       final CountDownLatch latch) {
+    public ExactlyOnceMessageProcessor(String threadName,
+                                       String bootstrapServers,
+                                       String inputTopic,
+                                       String outputTopic,
+                                       CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.inputTopic = inputTopic;
         this.outputTopic = outputTopic;
-        this.transactionalId = "Processor-" + instanceIdx;
+        this.transactionalId = threadName;
         // It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
         final int transactionTimeoutMs = 10000;
         // A unique transactional.id must be provided in order to properly use EOS.
         producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
         // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
         // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
-        this.groupInstanceId = "Txn-consumer-" + instanceIdx;
+        this.groupInstanceId = threadName;

Review Comment:
   Sure, this may help while debugging.



##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        try {
+            // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+            int transactionTimeoutMs = 10_000;
+            KafkaProducer<Integer, String> producer =
+                new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   I did some tests to set `max.poll.records` to 10. I tried to abort the transaction randomly, but the consumer didn't poll aborted messages automatically; I guess this is not match exactly once semantic; maybe we also need to seek topics to LSO manually while aborting transactions, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1530075375


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   I did a similar experiment by alternating commit and abort 1 record at a time, and you are correct. That said, if the transaction is always aborted (e.g. a poison pill) the example would not make any progress. Instead, the current logic moves on with the next batch. In a real world application you could send failed records to a DLT (dead letter topic) for further processing. Here we can simply log them and/or add a comment, WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1530075375


##########
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##########
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
 
     @Override
     public void run() {
-        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
-        producer.initTransactions();
-
-        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-        consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
-            @Override
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-                printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
-            }
-
-            @Override
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-                printWithTxnId("Received partition assignment after rebalancing: " + partitions);
-                messageRemaining.set(messagesRemaining(consumer));
-            }
-        });
-
-        int messageProcessed = 0;
-        while (messageRemaining.get() > 0) {
-            try {
-                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
-                if (records.count() > 0) {
-                    // Begin a new transaction session.
-                    producer.beginTransaction();
-                    for (ConsumerRecord<Integer, String> record : records) {
-                        // Process the record and send to downstream.
-                        ProducerRecord<Integer, String> customizedRecord = transform(record);
-                        producer.send(customizedRecord);
+        int processedRecords = 0;
+        long remainingRecords = Long.MAX_VALUE;
+        // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
+        int transactionTimeoutMs = 10_000;
+        // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
+        boolean readCommitted = true;
+        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
+                true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
+             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
+                 "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
+            // called first and once to fence zombies and abort any pending transaction
+            producer.initTransactions();
+
+            consumer.subscribe(singleton(inputTopic), this);
+
+            Utils.printOut("Processing new records");
+            while (!closed && remainingRecords > 0) {
+                try {
+                    ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
+                    if (!records.isEmpty()) {
+                        // begin a new transaction session
+                        producer.beginTransaction();
+
+                        for (ConsumerRecord<Integer, String> record : records) {
+                            // process the record and send downstream
+                            ProducerRecord<Integer, String> newRecord =
+                                new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
+                            producer.send(newRecord);
+                        }
+
+                        // checkpoint the progress by sending offsets to group coordinator broker
+                        // note that this API is only available for broker >= 2.5
+                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
+
+                        // commit the transaction including offsets
+                        producer.commitTransaction();
+                        processedRecords += records.count();
                     }
+                } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+                    // we can't recover from these exceptions
+                    Utils.printErr(e.getMessage());
+                    shutdown();
+                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+                    // invalid or no offset found without auto.reset.policy
+                    Utils.printOut("Invalid or no offset found, using latest");
+                    consumer.seekToEnd(emptyList());
+                    consumer.commitSync();
+                } catch (KafkaException e) {
+                    // abort the transaction and try to continue
+                    Utils.printOut("Aborting transaction: %s", e);
+                    producer.abortTransaction();
+                }
+                remainingRecords = getRemainingRecords(consumer);
+                if (remainingRecords != Long.MAX_VALUE) {
+                    Utils.printOut("Remaining records: %d", remainingRecords);
+                }
+            }
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
+        }
+        Utils.printOut("Processed %d records", processedRecords);
+        shutdown();
+    }
 
-                    Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        Utils.printOut("Revoked partitions: %s", partitions);
+    }
 
-                    // Checkpoint the progress by sending offsets to group coordinator broker.
-                    // Note that this API is only available for broker >= 2.5.
-                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        Utils.printOut("Assigned partitions: %s", partitions);
+    }
 
-                    // Finish the transaction. All sent records should be visible for consumption now.
-                    producer.commitTransaction();
-                    messageProcessed += records.count();
-                }
-            } catch (ProducerFencedException e) {
-                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
-            } catch (FencedInstanceIdException e) {
-                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
-            } catch (KafkaException e) {
-                // If we have not been fenced, try to abort the transaction and continue. This will raise immediately
-                // if the producer has hit a fatal error.
-                producer.abortTransaction();
-
-                // The consumer fetch position needs to be restored to the committed offset
-                // before the transaction started.
-                resetToLastCommittedPositions(consumer);
-            }
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions) {
+        Utils.printOut("Lost partitions: %s", partitions);
+    }
 
-            messageRemaining.set(messagesRemaining(consumer));
-            printWithTxnId("Message remaining: " + messageRemaining);
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-
-        printWithTxnId("Finished processing " + messageProcessed + " records");
-        latch.countDown();
     }
 
-    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
             offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
         }
         return offsets;
     }
 
-    private void printWithTxnId(final String message) {
-        System.out.println(transactionalId + ": " + message);
-    }
-
-    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
-        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
-        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
-    }
-
-    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
         final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
-        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        // if we can't detect any end offset, that means we are still not able to fetch offsets
         if (fullEndOffsets.isEmpty()) {
             return Long.MAX_VALUE;
         }
-
         return consumer.assignment().stream().mapToLong(partition -> {
             long currentPosition = consumer.position(partition);
-            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
             if (fullEndOffsets.containsKey(partition)) {
                 return fullEndOffsets.get(partition) - currentPosition;
+            } else {
+                return 0;
             }
-            return 0;
         }).sum();
     }
-
-    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {

Review Comment:
   I did a similar experiment by alternating commit and abort 1 record at a time, and you are correct. That said, if the transaction is always aborted (e.g. a poison pill) the example would not make any progress. Instead, the current logic moves on with the next batch. In a real world application you could send failed records to a DLT (dead letter topic) for further processing. Here we can simply log them and add a comment, WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org