You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2016/12/09 22:32:39 UTC

[GitHub] storm pull request #1821: STORM-2239: Handle InterruptException in new Kafka...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/1821

    STORM-2239: Handle InterruptException in new Kafka spout

    See https://issues.apache.org/jira/browse/STORM-2239.
    
    This basically just ensures that if the Kafka consumer throws InterruptException, we catch it and interrupt the current thread before returning to Storm.
    
    I realize that this won't be an issue until the next Kafka release, but it doesn't hurt to get it fixed now.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2239

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1821.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1821
    
----
commit efcefc0acd21e3065684107ae313d87a1ff1deda
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Date:   2016-11-29T20:17:36Z

    STORM-2239: Handle InterruptException in new Kafka spout

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1821: STORM-2239: Handle InterruptException in new Kafka...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1821#discussion_r92691637
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -435,7 +435,7 @@ public void close() {
             } catch (InterruptException e) {
                 //Kafka throws their own type of exception when interrupted.
                 //Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
    -            throw new RuntimeException(new InterruptedException());
    +            throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
             }
    --- End diff --
    
    @hmcl Better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    @srdo I think I understand what you mean. However, I think that the cleanest way to do this is to catch kafka's InterruptException, and call the Spout's `close()` method. If it is an error from which it cannot recover, it should close itself. This exception propagation is very cryptic, not only to say the exception names that overlap in all but one 'e'. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    @srdo [Kafka's InterruptException](https://kafka.apache.org/090/javadoc/org/apache/kafka/common/errors/InterruptException.html) is a RuntimeException, otherwise the code wouldn't even compile without the Exception either being caught or declared in the method signature. I think that the code as is will simply throw a RuntimeException and shutdown the JVM. 
    
    We don't really do this type of check elsewhere in the codebase, and in particular in the old spout. My opinion is that we don't need this check which will make the code a bit more complex to little gain. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1821: STORM-2239: Handle InterruptException in new Kafka...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1821


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    I went ahead and replaced the interrupts with `throw new RuntimeException(new InterruptedException());`, that way the executor stops immediately instead of waiting for next check in the async loop. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    @ptgoetz Sure, I'll add that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    One minor nit: It would be helpful to construct the exceptions with a message explaining what happened.
    
    Other than that I'm +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    @hmcl The code as is will work fine in distributed mode. The problem is when you're using the spout in an integration test, or are running the cluster in local mode for some other reason.
    
    When you're running an integration test in local mode, exceptions thrown by the executors are propagated to the testing code. If we don't catch and handle InterruptException the testing code has to handle this exception itself during shutdown. As far as I can tell (not sure though), the local mode code also stops shutting down executor threads once one of them throws an exception, so throwing exceptions from the spout might prevent other executor threads from being shut down. This is less of an issue if it only happens when a real error occurs, but it's not great if it happens randomly when this spout is shut down. Either way, there's no reason we shouldn't handle InterruptException internally in the spout instead of showing it to users.
    
    We didn't do it in the old spout because the old spout would be throwing InterruptedException (the built-in Java exception), and not InterruptException (the Kafka-specific exception that Storm doesn't know about). So when the old spout shut down, any InterruptedException would be caught by the check here https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L2181, which would let the cluster shut down cleanly. 
    
    About the last line in my previous comment: The exception being wrapped is a new Java InterruptedException, not a Kafka InterruptException. The point is to convert the InterruptException, which is semantically identical to Java's InterruptedException, into a form so Storm will treat it like a normal InterruptedException (which it essentially is). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    @srdo what is `Thread.currentThread().interrupt();` trying to accomplish?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    When Storm is trying to shut down, it interrupts the executor threads. This should cause the async loop (https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L2177) to stop running, since an InterruptedException will be thrown from either within the spout, or from the sleep statement in the loop. The loop catches and handles InterruptedException so we don't crash the worker while shutting down. The Kafka consumer doesn't throw InterruptedException when interrupted, but instead a Kafka specific unchecked InterruptException. If we don't catch and convert it to something Storm understands, the spout will cause the worker to crash when shutting down. This is very inconvenient for local mode clusters, since that means the VM running the cluster gets killed. The ways to convert the exception are either to throw a new (wrapped) InterruptedException out of the spout, or to reinterrupt the running thread.
    
    I can replace the interrupt with `throw new RuntimeException(new InterruptedException())` if you like?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1821: STORM-2239: Handle InterruptException in new Kafka spout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1821
  
    @hmcl I'd be happy to add comments explaining the propagation. Calling `close()` isn't enough. When the Kafka exception is thrown, the thread interrupt state is cleared (same behavior as the Java exception). If we just call `close()`, the executor thread keeps running because it's no longer interrupted (though it would probably crash on the next call to nextTuple since the consumer was closed). We have to either throw a wrapped InterruptedException, or reinterrupt the thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---