You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/29 17:02:54 UTC

[kafka] branch trunk updated: MINOR: Fix event output inconsistencies in TransactionalMessageCopier (#12098)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f0a09ea003 MINOR: Fix event output inconsistencies in TransactionalMessageCopier (#12098)
f0a09ea003 is described below

commit f0a09ea0036312b50321caacd2363818a4bc3859
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Apr 29 10:02:25 2022 -0700

    MINOR: Fix event output inconsistencies in TransactionalMessageCopier (#12098)
    
    This patch fixes some strangeness and inconsistency in the messages written by `TransactionalMessageCopier` to stdout. Here is a sample of two messages.
    
    Progress message:
    ```
    {"consumed":33000,"stage":"ProcessLoop","totalProcessed":33000,"progress":"copier-0","time":"2022/04/24 05:40:31:649","remaining":333}
    ```
    The `transactionalId` is set to the value of the `progress` key.
    
    And a shutdown message:
    ```
    {"consumed":33333,"shutdown_complete":"copier-0","totalProcessed":33333,"time":"2022/04/24 05:40:31:937","remaining":0}
    ```
    The `transactionalId` this time is set to the `shutdown_complete` key and there is no `stage` key.
    
    In this patch, we change the following:
    
    1. Use a separate key for the `transactionalId`.
    2. Drop the `progress` and `shutdown_complete` keys.
    3. Use `stage=ShutdownComplete` in the shutdown message.
    4. Modify `transactional_message_copier.py` system test service accordingly.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../services/transactional_message_copier.py       |  2 +-
 .../kafka/tools/TransactionalMessageCopier.java    | 54 +++++++++++++---------
 2 files changed, 34 insertions(+), 22 deletions(-)

diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index 0717463f37..675c7d7153 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -93,7 +93,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
                         self.consumed = int(data["consumed"])
                         self.logger.info("%s: consumed %d, remaining %d" %
                                          (self.transactional_id, self.consumed, self.remaining))
-                        if "shutdown_complete" in data:
+                        if data["stage"] == "ShutdownComplete":
                            if self.remaining == 0:
                                 # We are only finished if the remaining
                                 # messages at the time of shutdown is 0.
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 18c80996f2..289327e570 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -47,6 +47,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
@@ -260,27 +261,23 @@ public class TransactionalMessageCopier {
         return json;
     }
 
-    private static synchronized String statusAsJson(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId, String stage) {
-        Map<String, Object> statusData = new HashMap<>();
-        statusData.put("progress", transactionalId);
+    private static synchronized String statusAsJson(
+        String stage,
+        long totalProcessed,
+        long consumedSinceLastRebalanced,
+        long remaining,
+        String transactionalId
+    ) {
+        Map<String, Object> statusData = new LinkedHashMap<>();
+        statusData.put("transactionalId", transactionalId);
+        statusData.put("stage", stage);
+        statusData.put("time", FORMAT.format(new Date()));
         statusData.put("totalProcessed", totalProcessed);
         statusData.put("consumed", consumedSinceLastRebalanced);
         statusData.put("remaining", remaining);
-        statusData.put("time", FORMAT.format(new Date()));
-        statusData.put("stage", stage);
         return toJsonString(statusData);
     }
 
-    private static synchronized String shutDownString(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId) {
-        Map<String, Object> shutdownData = new HashMap<>();
-        shutdownData.put("shutdown_complete", transactionalId);
-        shutdownData.put("totalProcessed", totalProcessed);
-        shutdownData.put("consumed", consumedSinceLastRebalanced);
-        shutdownData.put("remaining", remaining);
-        shutdownData.put("time", FORMAT.format(new Date()));
-        return toJsonString(shutdownData);
-    }
-
     private static void abortTransactionAndResetPosition(
         KafkaProducer<String, String> producer,
         KafkaConsumer<String, String> consumer
@@ -330,8 +327,13 @@ public class TransactionalMessageCopier {
                         .mapToLong(partition -> messagesRemaining(consumer, partition)).sum());
                     numMessagesProcessedSinceLastRebalance.set(0);
                     // We use message cap for remaining here as the remainingMessages are not set yet.
-                    System.out.println(statusAsJson(totalMessageProcessed.get(),
-                        numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "RebalanceComplete"));
+                    System.out.println(statusAsJson(
+                        "RebalanceComplete",
+                        totalMessageProcessed.get(),
+                        numMessagesProcessedSinceLastRebalance.get(),
+                        remainingMessages.get(),
+                        transactionalId
+                    ));
                 }
             });
         } else {
@@ -349,16 +351,26 @@ public class TransactionalMessageCopier {
         Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> {
             isShuttingDown.set(true);
             consumer.wakeup();
-            System.out.println(shutDownString(totalMessageProcessed.get(),
-                numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId));
+            System.out.println(statusAsJson(
+                "ShutdownComplete",
+                totalMessageProcessed.get(),
+                numMessagesProcessedSinceLastRebalance.get(),
+                remainingMessages.get(),
+                transactionalId
+            ));
         });
 
         final boolean useGroupMetadata = parsedArgs.getBoolean("useGroupMetadata");
         try {
             Random random = new Random();
             while (!isShuttingDown.get() && remainingMessages.get() > 0) {
-                System.out.println(statusAsJson(totalMessageProcessed.get(),
-                    numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "ProcessLoop"));
+                System.out.println(statusAsJson(
+                    "ProcessLoop",
+                    totalMessageProcessed.get(),
+                    numMessagesProcessedSinceLastRebalance.get(),
+                    remainingMessages.get(),
+                    transactionalId
+                ));
 
                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
                 if (records.count() > 0) {