You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/21 20:28:14 UTC
[kafka] branch 2.5 updated: KAFKA-10185: Restoration info logging
(#8896) (#8904)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 9ba7137 KAFKA-10185: Restoration info logging (#8896) (#8904)
9ba7137 is described below
commit 9ba713759a6986cf1e160f3839e73d9df400f112
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sun Jun 21 15:27:14 2020 -0500
KAFKA-10185: Restoration info logging (#8896) (#8904)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/StoreChangelogReader.java | 36 ++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b8be4bc..61a6877 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Set;
public class StoreChangelogReader implements ChangelogReader {
+ private static final long RESTORE_LOG_INTERVAL_MS = 10_000L;
private final Logger log;
private final Consumer<byte[], byte[]> restoreConsumer;
@@ -53,6 +54,8 @@ public class StoreChangelogReader implements ChangelogReader {
private final Set<TopicPartition> completedRestorers = new HashSet<>();
private final Duration pollTime;
+ private long lastRestoreLogTime = 0L;
+
public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final Duration pollTime,
final StateRestoreListener userStateRestoreListener,
@@ -120,9 +123,42 @@ public class StoreChangelogReader implements ChangelogReader {
checkForCompletedRestoration();
+ maybeLogRestorationProgress();
+
return completedRestorers;
}
+ private void maybeLogRestorationProgress() {
+ if (needsRestoring.isEmpty()) {
+ lastRestoreLogTime = 0L;
+ } else {
+ final long now = System.currentTimeMillis();
+ if (now - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
+ final Set<TopicPartition> topicPartitions = needsRestoring;
+ if (!topicPartitions.isEmpty()) {
+ final StringBuilder builder = new StringBuilder().append("Restoration in progress for ")
+ .append(topicPartitions.size())
+ .append(" partitions.");
+ for (final TopicPartition partition : topicPartitions) {
+ final StateRestorer stateRestorer = stateRestorers.get(partition);
+ builder.append(" {")
+ .append(partition)
+ .append(": ")
+ .append("position=")
+ .append(stateRestorer.restoredOffset())
+ .append(", end=")
+ .append(restoreToOffsets.get(partition))
+ .append(", totalRestored=")
+ .append(stateRestorer.restoredNumRecords())
+ .append("}");
+ }
+ log.info(builder.toString());
+ lastRestoreLogTime = now;
+ }
+ }
+ }
+ }
+
private void initialize(final RestoringTasks active) {
if (!restoreConsumer.subscription().isEmpty()) {
throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")");