You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/04/29 17:02:54 UTC
[kafka] branch trunk updated: MINOR: Fix event output inconsistencies in TransactionalMessageCopier (#12098)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f0a09ea003 MINOR: Fix event output inconsistencies in TransactionalMessageCopier (#12098)
f0a09ea003 is described below
commit f0a09ea0036312b50321caacd2363818a4bc3859
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Apr 29 10:02:25 2022 -0700
MINOR: Fix event output inconsistencies in TransactionalMessageCopier (#12098)
This patch fixes some strangeness and inconsistency in the messages written by `TransactionalMessageCopier` to stdout. Here is a sample of two messages.
Progress message:
```
{"consumed":33000,"stage":"ProcessLoop","totalProcessed":33000,"progress":"copier-0","time":"2022/04/24 05:40:31:649","remaining":333}
```
The `transactionalId` is set to the value of the `progress` key.
And a shutdown message:
```
{"consumed":33333,"shutdown_complete":"copier-0","totalProcessed":33333,"time":"2022/04/24 05:40:31:937","remaining":0}
```
The `transactionalId` this time is set to the `shutdown_complete` key and there is no `stage` key.
In this patch, we change the following:
1. Use a separate key for the `transactionalId`.
2. Drop the `progress` and `shutdown_complete` keys.
3. Use `stage=ShutdownComplete` in the shutdown message.
4. Modify `transactional_message_copier.py` system test service accordingly.
Reviewers: David Arthur <mu...@gmail.com>
---
.../services/transactional_message_copier.py | 2 +-
.../kafka/tools/TransactionalMessageCopier.java | 54 +++++++++++++---------
2 files changed, 34 insertions(+), 22 deletions(-)
diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index 0717463f37..675c7d7153 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -93,7 +93,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
self.consumed = int(data["consumed"])
self.logger.info("%s: consumed %d, remaining %d" %
(self.transactional_id, self.consumed, self.remaining))
- if "shutdown_complete" in data:
+ if data["stage"] == "ShutdownComplete":
if self.remaining == 0:
# We are only finished if the remaining
# messages at the time of shutdown is 0.
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 18c80996f2..289327e570 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
@@ -260,27 +261,23 @@ public class TransactionalMessageCopier {
return json;
}
- private static synchronized String statusAsJson(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId, String stage) {
- Map<String, Object> statusData = new HashMap<>();
- statusData.put("progress", transactionalId);
+ private static synchronized String statusAsJson(
+ String stage,
+ long totalProcessed,
+ long consumedSinceLastRebalanced,
+ long remaining,
+ String transactionalId
+ ) {
+ Map<String, Object> statusData = new LinkedHashMap<>();
+ statusData.put("transactionalId", transactionalId);
+ statusData.put("stage", stage);
+ statusData.put("time", FORMAT.format(new Date()));
statusData.put("totalProcessed", totalProcessed);
statusData.put("consumed", consumedSinceLastRebalanced);
statusData.put("remaining", remaining);
- statusData.put("time", FORMAT.format(new Date()));
- statusData.put("stage", stage);
return toJsonString(statusData);
}
- private static synchronized String shutDownString(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId) {
- Map<String, Object> shutdownData = new HashMap<>();
- shutdownData.put("shutdown_complete", transactionalId);
- shutdownData.put("totalProcessed", totalProcessed);
- shutdownData.put("consumed", consumedSinceLastRebalanced);
- shutdownData.put("remaining", remaining);
- shutdownData.put("time", FORMAT.format(new Date()));
- return toJsonString(shutdownData);
- }
-
private static void abortTransactionAndResetPosition(
KafkaProducer<String, String> producer,
KafkaConsumer<String, String> consumer
@@ -330,8 +327,13 @@ public class TransactionalMessageCopier {
.mapToLong(partition -> messagesRemaining(consumer, partition)).sum());
numMessagesProcessedSinceLastRebalance.set(0);
// We use message cap for remaining here as the remainingMessages are not set yet.
- System.out.println(statusAsJson(totalMessageProcessed.get(),
- numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "RebalanceComplete"));
+ System.out.println(statusAsJson(
+ "RebalanceComplete",
+ totalMessageProcessed.get(),
+ numMessagesProcessedSinceLastRebalance.get(),
+ remainingMessages.get(),
+ transactionalId
+ ));
}
});
} else {
@@ -349,16 +351,26 @@ public class TransactionalMessageCopier {
Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> {
isShuttingDown.set(true);
consumer.wakeup();
- System.out.println(shutDownString(totalMessageProcessed.get(),
- numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId));
+ System.out.println(statusAsJson(
+ "ShutdownComplete",
+ totalMessageProcessed.get(),
+ numMessagesProcessedSinceLastRebalance.get(),
+ remainingMessages.get(),
+ transactionalId
+ ));
});
final boolean useGroupMetadata = parsedArgs.getBoolean("useGroupMetadata");
try {
Random random = new Random();
while (!isShuttingDown.get() && remainingMessages.get() > 0) {
- System.out.println(statusAsJson(totalMessageProcessed.get(),
- numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "ProcessLoop"));
+ System.out.println(statusAsJson(
+ "ProcessLoop",
+ totalMessageProcessed.get(),
+ numMessagesProcessedSinceLastRebalance.get(),
+ remainingMessages.get(),
+ transactionalId
+ ));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {