You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/21 20:28:14 UTC

[kafka] branch 2.5 updated: KAFKA-10185: Restoration info logging (#8896) (#8904)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 9ba7137  KAFKA-10185: Restoration info logging (#8896) (#8904)
9ba7137 is described below

commit 9ba713759a6986cf1e160f3839e73d9df400f112
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sun Jun 21 15:27:14 2020 -0500

    KAFKA-10185: Restoration info logging (#8896) (#8904)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/StoreChangelogReader.java  | 36 ++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b8be4bc..61a6877 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class StoreChangelogReader implements ChangelogReader {
+    private static final long RESTORE_LOG_INTERVAL_MS = 10_000L;
 
     private final Logger log;
     private final Consumer<byte[], byte[]> restoreConsumer;
@@ -53,6 +54,8 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Set<TopicPartition> completedRestorers = new HashSet<>();
     private final Duration pollTime;
 
+    private long lastRestoreLogTime = 0L;
+
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final Duration pollTime,
                                 final StateRestoreListener userStateRestoreListener,
@@ -120,9 +123,42 @@ public class StoreChangelogReader implements ChangelogReader {
 
         checkForCompletedRestoration();
 
+        maybeLogRestorationProgress();
+
         return completedRestorers;
     }
 
+    private void maybeLogRestorationProgress() {
+        if (needsRestoring.isEmpty()) {
+            lastRestoreLogTime = 0L;
+        } else {
+            final long now = System.currentTimeMillis();
+            if (now - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
+                final Set<TopicPartition> topicPartitions = needsRestoring;
+                if (!topicPartitions.isEmpty()) {
+                    final StringBuilder builder = new StringBuilder().append("Restoration in progress for ")
+                                                                     .append(topicPartitions.size())
+                                                                     .append(" partitions.");
+                    for (final TopicPartition partition : topicPartitions) {
+                        final StateRestorer stateRestorer = stateRestorers.get(partition);
+                        builder.append(" {")
+                               .append(partition)
+                               .append(": ")
+                               .append("position=")
+                               .append(stateRestorer.restoredOffset())
+                               .append(", end=")
+                               .append(restoreToOffsets.get(partition))
+                               .append(", totalRestored=")
+                               .append(stateRestorer.restoredNumRecords())
+                               .append("}");
+                    }
+                    log.info(builder.toString());
+                    lastRestoreLogTime = now;
+                }
+            }
+        }
+    }
+
     private void initialize(final RestoringTasks active) {
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")");