You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/29 09:27:06 UTC

kafka git commit: KAFKA-5949; Follow-up after latest KIP-161 changes

Repository: kafka
Updated Branches:
  refs/heads/trunk eaabb6cd0 -> b79b17971


KAFKA-5949; Follow-up after latest KIP-161 changes

 - compare KAFKA-5958

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>

Closes #3986 from mjsax/kafka-5949-exceptions-user-callbacks-KIP-161-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b79b1797
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b79b1797
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b79b1797

Branch: refs/heads/trunk
Commit: b79b179716b5f8bacb870a53a5a9216a0687b3c9
Parents: eaabb6c
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Sep 29 10:21:57 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Sep 29 10:21:57 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 30 ++++++++++++++++++--
 .../internals/CompositeRestoreListener.java     | 30 ++------------------
 2 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b79b1797/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2f5ce4b..928d0e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -562,21 +562,45 @@ public class KafkaStreams {
             @Override
             public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
                 if (globalStateRestoreListener != null) {
-                    globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+                    try {
+                        globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+                    } catch (final Exception fatalUserException) {
+                        throw new StreamsException(
+                            String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                                storeName,
+                                topicPartition),
+                            fatalUserException);
+                    }
                 }
             }
 
             @Override
             public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) {
                 if (globalStateRestoreListener != null) {
-                    globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+                    try {
+                        globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+                    } catch (final Exception fatalUserException) {
+                        throw new StreamsException(
+                            String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                                storeName,
+                                topicPartition),
+                            fatalUserException);
+                    }
                 }
             }
 
             @Override
             public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
                 if (globalStateRestoreListener != null) {
-                    globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+                    try {
+                        globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+                    } catch (final Exception fatalUserException) {
+                        throw new StreamsException(
+                            String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                                storeName,
+                                topicPartition),
+                            fatalUserException);
+                    }
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/b79b1797/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
index a1c2f7f..01ba457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -55,15 +55,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
                                final String storeName,
                                final long startingOffset,
                                final long endingOffset) {
-        try {
-            userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
-        } catch (final Exception fatalUserException) {
-            throw new StreamsException(
-                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
-                              storeName,
-                              topicPartition),
-                fatalUserException);
-        }
+        userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
         storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
     }
 
@@ -76,15 +68,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
                                 final String storeName,
                                 final long batchEndOffset,
                                 final long numRestored) {
-        try {
-            userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
-        } catch (final Exception fatalUserException) {
-            throw new StreamsException(
-                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
-                    storeName,
-                    topicPartition),
-                fatalUserException);
-        }
+        userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
         storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
     }
 
@@ -96,15 +80,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
     public void onRestoreEnd(final TopicPartition topicPartition,
                              final String storeName,
                              final long totalRestored) {
-        try {
-            userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-        } catch (final Exception fatalUserException) {
-            throw new StreamsException(
-                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
-                    storeName,
-                    topicPartition),
-                fatalUserException);
-        }
+        userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
         storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
     }