You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/20 18:35:55 UTC

kafka git commit: MINOR: Add more exception information in ProcessorStateManager

Repository: kafka
Updated Branches:
  refs/heads/trunk 56c61745d -> 7f4b278c0


MINOR: Add more exception information in ProcessorStateManager

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy, Jun Rao

Closes #2276 from guozhangwang/KMinor-exception-message


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f4b278c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f4b278c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f4b278c

Branch: refs/heads/trunk
Commit: 7f4b278c030f6e5cff8caca6428f0e631c8918ec
Parents: 56c6174
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Dec 20 10:35:52 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 20 10:35:52 2016 -0800

----------------------------------------------------------------------
 .../streams/processor/internals/ProcessorStateManager.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7f4b278c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 30b84f1..821b260 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -171,7 +171,7 @@ public class ProcessorStateManager {
             try {
                 partitions = restoreConsumer.partitionsFor(topic);
             } catch (TimeoutException e) {
-                throw new StreamsException(String.format("%s Could not find partition info for topic: %s", logPrefix, topic));
+                throw new StreamsException(String.format("%s Could not fetch partition info for topic: %s before expiration of the configured request timeout", logPrefix, topic));
             }
             if (partitions == null) {
                 throw new StreamsException(String.format("%s Could not find partition info for topic: %s", logPrefix, topic));
@@ -209,7 +209,7 @@ public class ProcessorStateManager {
 
         // subscribe to the store's partition
         if (!restoreConsumer.subscription().isEmpty()) {
-            throw new IllegalStateException(String.format("%s Restore consumer should have not subscribed to any partitions beforehand", logPrefix));
+            throw new IllegalStateException(String.format("%s Restore consumer should have not subscribed to any partitions (%s) beforehand", logPrefix, restoreConsumer.subscription()));
         }
         TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName));
         restoreConsumer.assign(Collections.singletonList(storePartition));
@@ -245,7 +245,8 @@ public class ProcessorStateManager {
                 } else if (restoreConsumer.position(storePartition) > endOffset) {
                     // For a logging enabled changelog (no offset limit),
                     // the log end offset should not change while restoring since it is only written by this thread.
-                    throw new IllegalStateException(String.format("%s Log end offset should not change while restoring", logPrefix));
+                    throw new IllegalStateException(String.format("%s Log end offset of %s should not change while restoring: old end offset %d, current offset %d",
+                            logPrefix, storePartition, endOffset, restoreConsumer.position(storePartition)));
                 }
             }