You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 07:54:45 UTC
[1/3] storm git commit: STORM-2239: Handle InterruptException in new
Kafka spout
Repository: storm
Updated Branches:
refs/heads/1.x-branch 55b3099e1 -> b515356c1
STORM-2239: Handle InterruptException in new Kafka spout
* Add message to InterruptedExceptions thrown when Kafka consumer is interrupted
* Closes #1821
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/145af644
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/145af644
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/145af644
Branch: refs/heads/1.x-branch
Commit: 145af644324b62f2c060c6e795ac6b5474fd17d2
Parents: 55b3099
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Nov 29 21:17:36 2016 +0100
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:54:03 2016 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 52 ++++++++++++++------
1 file changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/145af644/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 6528ae9..38095fe 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -53,6 +53,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.kafka.common.errors.InterruptException;
+
public class KafkaSpout<K, V> extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
@@ -199,22 +201,32 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void nextTuple() {
- if (initialized) {
- if (commit()) {
- commitOffsetsForAckedTuples();
- }
+ try{
+ if (initialized) {
+ if (commit()) {
+ commitOffsetsForAckedTuples();
+ }
- if (poll()) {
- setWaitingToEmit(pollKafkaBroker());
- }
+ if (poll()) {
+ setWaitingToEmit(pollKafkaBroker());
+ }
- if (waitingToEmit()) {
- emit();
+ if (waitingToEmit()) {
+ emit();
+ }
+ } else {
+ LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
}
- } else {
- LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
+ } catch (InterruptException e) {
+ throwKafkaConsumerInterruptedException();
}
}
+
+ private void throwKafkaConsumerInterruptedException() {
+ //Kafka throws their own type of exception when interrupted.
+ //Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
+ throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
+ }
private boolean commit() {
return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
@@ -358,7 +370,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void activate() {
- subscribeKafkaConsumer();
+ try {
+ subscribeKafkaConsumer();
+ } catch (InterruptException e) {
+ throwKafkaConsumerInterruptedException();
+ }
}
private void subscribeKafkaConsumer() {
@@ -381,12 +397,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void deactivate() {
- shutdown();
+ try {
+ shutdown();
+ } catch (InterruptException e) {
+ throwKafkaConsumerInterruptedException();
+ }
}
@Override
public void close() {
- shutdown();
+ try {
+ shutdown();
+ } catch (InterruptException e) {
+ throwKafkaConsumerInterruptedException();
+ }
}
private void shutdown() {
[2/3] storm git commit: Merge branch 'STORM-2239-1.x-merge' into
1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2239-1.x-merge' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45e05b0c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45e05b0c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45e05b0c
Branch: refs/heads/1.x-branch
Commit: 45e05b0c18dc9dfb05b666edc171f385c8c898b0
Parents: 55b3099 145af64
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:54:14 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:54:14 2016 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 52 ++++++++++++++------
1 file changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: STORM-2239: CHANGELOG
Posted by ka...@apache.org.
STORM-2239: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b515356c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b515356c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b515356c
Branch: refs/heads/1.x-branch
Commit: b515356c1807c3657ffaac6776bac75ab565a94f
Parents: 45e05b0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:54:36 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:54:36 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b515356c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3661e45..b965cf4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2239: Handle InterruptException in new Kafka spout
* STORM-2087: Storm-kafka-client: Failed tuples are not always replayed
* STORM-2238: Add Timestamp extractor for windowed bolt
* STORM-2246: Logviewer download link has urlencoding on part of the URL