You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/22 19:24:36 UTC

kafka git commit: MINOR: log state store restore offsets

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 cf310ec48 -> 75df53f4b


MINOR: log state store restore offsets

Debug logging of the start and end offsets used during state store restoration

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2724 from dguy/log-restore-offsets-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75df53f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75df53f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75df53f4

Branch: refs/heads/0.10.2
Commit: 75df53f4b3923fee79a7457f1e32abca7e5e98ee
Parents: cf310ec
Author: Damian Guy <da...@gmail.com>
Authored: Wed Mar 22 12:24:33 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 22 12:24:33 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/ProcessorStateManager.java | 4 ++++
 .../apache/kafka/streams/integration/ResetIntegrationTest.java   | 1 -
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75df53f4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ad16c77..65831a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -219,6 +219,10 @@ public class ProcessorStateManager implements StateManager {
             } else {
                 restoreConsumer.seekToBeginning(singleton(storePartition));
             }
+            log.debug("restoring partition {} from offset {} to endOffset {}",
+                      storePartition,
+                      restoreConsumer.position(storePartition),
+                      endOffset);
 
             // restore its state from changelog records
             long limit = offsetLimit(storePartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/75df53f4/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 35a58f0..39889b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,7 +23,6 @@ import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;