You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/22 02:14:07 UTC
kafka git commit: MINOR: log start and end offsets for state store
restoration
Repository: kafka
Updated Branches:
refs/heads/trunk a7e3679d2 -> b3beaebd4
MINOR: log start and end offsets for state store restoration
Debug loggin of the start and end offsets used during state store restoration
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2718 from dguy/log-restore-offsets
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3beaebd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3beaebd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3beaebd
Branch: refs/heads/trunk
Commit: b3beaebd4481dc94a36949dbb87e02c28272adb0
Parents: a7e3679
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 21 19:13:57 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 21 19:13:57 2017 -0700
----------------------------------------------------------------------
.../processor/internals/StoreChangelogReader.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3beaebd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index f95ea4a..0afd6c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -118,8 +118,14 @@ public class StoreChangelogReader implements ChangelogReader {
for (final StateRestorer restorer : needsRestoring.values()) {
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
consumer.seek(restorer.partition(), restorer.checkpoint());
+ logRestoreOffsets(restorer.partition(),
+ restorer.checkpoint(),
+ endOffsets.get(restorer.partition()));
} else {
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+ logRestoreOffsets(restorer.partition(),
+ consumer.position(restorer.partition()),
+ endOffsets.get(restorer.partition()));
}
}
@@ -137,6 +143,13 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
+ private void logRestoreOffsets(final TopicPartition partition, final long checkpoint, final Long aLong) {
+ log.debug("restoring partition {} from offset {} to endOffset {}",
+ partition,
+ checkpoint,
+ aLong);
+ }
+
@Override
public Map<TopicPartition, Long> restoredOffsets() {
final Map<TopicPartition, Long> restoredOffsets = new HashMap<>();