You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/20 18:35:55 UTC
kafka git commit: MINOR: Add more exception information in
ProcessorStateManager
Repository: kafka
Updated Branches:
refs/heads/trunk 56c61745d -> 7f4b278c0
MINOR: Add more exception information in ProcessorStateManager
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy, Jun Rao
Closes #2276 from guozhangwang/KMinor-exception-message
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f4b278c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f4b278c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f4b278c
Branch: refs/heads/trunk
Commit: 7f4b278c030f6e5cff8caca6428f0e631c8918ec
Parents: 56c6174
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Dec 20 10:35:52 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 20 10:35:52 2016 -0800
----------------------------------------------------------------------
.../streams/processor/internals/ProcessorStateManager.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7f4b278c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 30b84f1..821b260 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -171,7 +171,7 @@ public class ProcessorStateManager {
try {
partitions = restoreConsumer.partitionsFor(topic);
} catch (TimeoutException e) {
- throw new StreamsException(String.format("%s Could not find partition info for topic: %s", logPrefix, topic));
+ throw new StreamsException(String.format("%s Could not fetch partition info for topic: %s before expiration of the configured request timeout", logPrefix, topic));
}
if (partitions == null) {
throw new StreamsException(String.format("%s Could not find partition info for topic: %s", logPrefix, topic));
@@ -209,7 +209,7 @@ public class ProcessorStateManager {
// subscribe to the store's partition
if (!restoreConsumer.subscription().isEmpty()) {
- throw new IllegalStateException(String.format("%s Restore consumer should have not subscribed to any partitions beforehand", logPrefix));
+ throw new IllegalStateException(String.format("%s Restore consumer should have not subscribed to any partitions (%s) beforehand", logPrefix, restoreConsumer.subscription()));
}
TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName));
restoreConsumer.assign(Collections.singletonList(storePartition));
@@ -245,7 +245,8 @@ public class ProcessorStateManager {
} else if (restoreConsumer.position(storePartition) > endOffset) {
// For a logging enabled changelog (no offset limit),
// the log end offset should not change while restoring since it is only written by this thread.
- throw new IllegalStateException(String.format("%s Log end offset should not change while restoring", logPrefix));
+ throw new IllegalStateException(String.format("%s Log end offset of %s should not change while restoring: old end offset %d, current offset %d",
+ logPrefix, storePartition, endOffset, restoreConsumer.position(storePartition)));
}
}