You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/29 09:27:06 UTC
kafka git commit: KAFKA-5949; Follow-up after latest KIP-161 changes
Repository: kafka
Updated Branches:
refs/heads/trunk eaabb6cd0 -> b79b17971
KAFKA-5949; Follow-up after latest KIP-161 changes
- compare KAFKA-5958
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>
Closes #3986 from mjsax/kafka-5949-exceptions-user-callbacks-KIP-161-follow-up
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b79b1797
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b79b1797
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b79b1797
Branch: refs/heads/trunk
Commit: b79b179716b5f8bacb870a53a5a9216a0687b3c9
Parents: eaabb6c
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Sep 29 10:21:57 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Sep 29 10:21:57 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 30 ++++++++++++++++++--
.../internals/CompositeRestoreListener.java | 30 ++------------------
2 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b79b1797/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2f5ce4b..928d0e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -562,21 +562,45 @@ public class KafkaStreams {
@Override
public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
if (globalStateRestoreListener != null) {
- globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+ try {
+ globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+ storeName,
+ topicPartition),
+ fatalUserException);
+ }
}
}
@Override
public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) {
if (globalStateRestoreListener != null) {
- globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+ try {
+ globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+ storeName,
+ topicPartition),
+ fatalUserException);
+ }
}
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
if (globalStateRestoreListener != null) {
- globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+ try {
+ globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+ storeName,
+ topicPartition),
+ fatalUserException);
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/b79b1797/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
index a1c2f7f..01ba457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -55,15 +55,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
final String storeName,
final long startingOffset,
final long endingOffset) {
- try {
- userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
- } catch (final Exception fatalUserException) {
- throw new StreamsException(
- String.format("Fatal user code error in store restore listener for store %s, partition %s.",
- storeName,
- topicPartition),
- fatalUserException);
- }
+ userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
}
@@ -76,15 +68,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
final String storeName,
final long batchEndOffset,
final long numRestored) {
- try {
- userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
- } catch (final Exception fatalUserException) {
- throw new StreamsException(
- String.format("Fatal user code error in store restore listener for store %s, partition %s.",
- storeName,
- topicPartition),
- fatalUserException);
- }
+ userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
}
@@ -96,15 +80,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName,
final long totalRestored) {
- try {
- userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
- } catch (final Exception fatalUserException) {
- throw new StreamsException(
- String.format("Fatal user code error in store restore listener for store %s, partition %s.",
- storeName,
- topicPartition),
- fatalUserException);
- }
+ userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
}