You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/22 19:24:36 UTC
kafka git commit: MINOR: log state store restore offsets
Repository: kafka
Updated Branches:
refs/heads/0.10.2 cf310ec48 -> 75df53f4b
MINOR: log state store restore offsets
Debug logging of the start and end offsets used during state store restoration
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2724 from dguy/log-restore-offsets-0.10.2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75df53f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75df53f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75df53f4
Branch: refs/heads/0.10.2
Commit: 75df53f4b3923fee79a7457f1e32abca7e5e98ee
Parents: cf310ec
Author: Damian Guy <da...@gmail.com>
Authored: Wed Mar 22 12:24:33 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 22 12:24:33 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/processor/internals/ProcessorStateManager.java | 4 ++++
.../apache/kafka/streams/integration/ResetIntegrationTest.java | 1 -
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/75df53f4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ad16c77..65831a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -219,6 +219,10 @@ public class ProcessorStateManager implements StateManager {
} else {
restoreConsumer.seekToBeginning(singleton(storePartition));
}
+ log.debug("restoring partition {} from offset {} to endOffset {}",
+ storePartition,
+ restoreConsumer.position(storePartition),
+ endOffset);
// restore its state from changelog records
long limit = offsetLimit(storePartition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/75df53f4/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 35a58f0..39889b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,7 +23,6 @@ import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;