You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/06 10:31:09 UTC

[kafka] branch 2.4 updated: KAFKA-9079: Fix reset logic in transactional message copier

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new fba3e7e  KAFKA-9079: Fix reset logic in transactional message copier
fba3e7e is described below

commit fba3e7e1f9c88bf5026239c876a5278510e5e445
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Nov 6 15:59:51 2019 +0530

    KAFKA-9079: Fix reset logic in transactional message copier
    
    The consumer's `committed` API does not return an entry in the response map for a requested partition if there is no committed offset. The transactional message copier, which is used in the transaction system test, did not account for this. If the first transaction attempted by the copier was randomly aborted, then we would not seek to the beginning as expected, which means we would fail to copy some of the records.
    
    This patch fixes the problem by iterating over the assignment rather than the result of `committed` when resetting offsets. It also adds enables additional logging in the transaction message copier service to make finding problems easier in the future.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
    
    Closes #7653 from hachikuji/fix-transaction-system-test
    
    (cherry picked from commit 903d66e2f9220fc8a016f68b798ef11930372557)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 tests/kafkatest/services/transactional_message_copier.py               | 3 ++-
 .../main/java/org/apache/kafka/tools/TransactionalMessageCopier.java   | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index 53fffa4..1a6a34c 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -63,7 +63,8 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
         self.stop_timeout_sec = 60
         self.enable_random_aborts = enable_random_aborts
         self.loggers = {
-            "org.apache.kafka.clients.producer.internals": "TRACE"
+            "org.apache.kafka.clients.producer": "TRACE",
+            "org.apache.kafka.clients.consumer": "TRACE"
         }
 
     def _worker(self, idx, node):
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index cfbac1a..746b8fe 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -196,7 +196,8 @@ public class TransactionalMessageCopier {
 
     private static void resetToLastCommittedPositions(KafkaConsumer<String, String> consumer) {
         final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
-        committed.forEach((tp, offsetAndMetadata) -> {
+        consumer.assignment().forEach(tp -> {
+            OffsetAndMetadata offsetAndMetadata = committed.get(tp);
             if (offsetAndMetadata != null)
                 consumer.seek(tp, offsetAndMetadata.offset());
             else