You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/22 02:14:07 UTC

kafka git commit: MINOR: log start and end offsets for state store restoration

Repository: kafka
Updated Branches:
  refs/heads/trunk a7e3679d2 -> b3beaebd4


MINOR: log start and end offsets for state store restoration

Debug loggin of the start and end offsets used during state store restoration

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2718 from dguy/log-restore-offsets


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3beaebd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3beaebd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3beaebd

Branch: refs/heads/trunk
Commit: b3beaebd4481dc94a36949dbb87e02c28272adb0
Parents: a7e3679
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 21 19:13:57 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 21 19:13:57 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/StoreChangelogReader.java      | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3beaebd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index f95ea4a..0afd6c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -118,8 +118,14 @@ public class StoreChangelogReader implements ChangelogReader {
             for (final StateRestorer restorer : needsRestoring.values()) {
                 if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
                     consumer.seek(restorer.partition(), restorer.checkpoint());
+                    logRestoreOffsets(restorer.partition(),
+                                      restorer.checkpoint(),
+                                      endOffsets.get(restorer.partition()));
                 } else {
                     consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                    logRestoreOffsets(restorer.partition(),
+                                      consumer.position(restorer.partition()),
+                                      endOffsets.get(restorer.partition()));
                 }
             }
 
@@ -137,6 +143,13 @@ public class StoreChangelogReader implements ChangelogReader {
         }
     }
 
+    private void logRestoreOffsets(final TopicPartition partition, final long checkpoint, final Long aLong) {
+        log.debug("restoring partition {} from offset {} to endOffset {}",
+                  partition,
+                  checkpoint,
+                  aLong);
+    }
+
     @Override
     public Map<TopicPartition, Long> restoredOffsets() {
         final Map<TopicPartition, Long> restoredOffsets = new HashMap<>();