You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 07:54:45 UTC

[1/3] storm git commit: STORM-2239: Handle InterruptException in new Kafka spout

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 55b3099e1 -> b515356c1


STORM-2239: Handle InterruptException in new Kafka spout

* Add message to InterruptedExceptions thrown when Kafka consumer is interrupted
* Closes #1821


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/145af644
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/145af644
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/145af644

Branch: refs/heads/1.x-branch
Commit: 145af644324b62f2c060c6e795ac6b5474fd17d2
Parents: 55b3099
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Nov 29 21:17:36 2016 +0100
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:54:03 2016 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 52 ++++++++++++++------
 1 file changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/145af644/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 6528ae9..38095fe 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -53,6 +53,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
+import org.apache.kafka.common.errors.InterruptException;
+
 public class KafkaSpout<K, V> extends BaseRichSpout {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
     private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
@@ -199,22 +201,32 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void nextTuple() {
-        if (initialized) {
-            if (commit()) {
-                commitOffsetsForAckedTuples();
-            }
+        try{
+            if (initialized) {
+                if (commit()) {
+                    commitOffsetsForAckedTuples();
+                }
 
-            if (poll()) {
-                setWaitingToEmit(pollKafkaBroker());
-            }
+                if (poll()) {
+                    setWaitingToEmit(pollKafkaBroker());
+                }
 
-            if (waitingToEmit()) {
-                emit();
+                if (waitingToEmit()) {
+                    emit();
+                }
+            } else {
+                LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
             }
-        } else {
-            LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
         }
     }
+    
+    private void throwKafkaConsumerInterruptedException() {
+        //Kafka throws their own type of exception when interrupted.
+        //Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
+        throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
+    }
 
     private boolean commit() {
         return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
@@ -358,7 +370,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void activate() {
-        subscribeKafkaConsumer();
+        try {
+            subscribeKafkaConsumer();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
+        }
     }
 
     private void subscribeKafkaConsumer() {
@@ -381,12 +397,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void deactivate() {
-        shutdown();
+        try {
+            shutdown();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
+        }
     }
 
     @Override
     public void close() {
-        shutdown();
+        try {
+            shutdown();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
+        }
     }
 
     private void shutdown() {


[2/3] storm git commit: Merge branch 'STORM-2239-1.x-merge' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2239-1.x-merge' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45e05b0c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45e05b0c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45e05b0c

Branch: refs/heads/1.x-branch
Commit: 45e05b0c18dc9dfb05b666edc171f385c8c898b0
Parents: 55b3099 145af64
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:54:14 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:54:14 2016 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 52 ++++++++++++++------
 1 file changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: STORM-2239: CHANGELOG

Posted by ka...@apache.org.
STORM-2239: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b515356c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b515356c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b515356c

Branch: refs/heads/1.x-branch
Commit: b515356c1807c3657ffaac6776bac75ab565a94f
Parents: 45e05b0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Dec 20 16:54:36 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Dec 20 16:54:36 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b515356c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3661e45..b965cf4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2239: Handle InterruptException in new Kafka spout
  * STORM-2087: Storm-kafka-client: Failed tuples are not always replayed
  * STORM-2238: Add Timestamp extractor for windowed bolt
  * STORM-2246: Logviewer download link has urlencoding on part of the URL