You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/05/17 19:57:12 UTC

[jira] [Commented] (STORM-1682) Kafka spout can lose partitions

    [ https://issues.apache.org/jira/browse/STORM-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15287429#comment-15287429 ] 

ASF GitHub Bot commented on STORM-1682:
---------------------------------------

GitHub user srdo reopened a pull request:

    https://github.com/apache/storm/pull/1340

    STORM-1682: Refactor DynamicBrokersReader to lookup leader informatio…

    …n via Kafka metadata API. Put Zookeeper metadata lookup into new class.
    
    See https://issues.apache.org/jira/browse/STORM-1682.
    
    This solution splits DynamicBrokersReader into two. Instead of looking up leaders and brokers in Zookeeper, ZkBrokersReader will use the brokers in Zookeeper to construct one or more SimpleConsumers, which DynamicBrokersReader can then use to lookup partition leaders.
    
    DynamicBrokersReader should now cause the spout to restart in the following scenarios:
    If it can't find any brokers in Zookeeper
    If it can't contact any of the brokers it found in Zookeeper
    If wildcard topics are listed in Zookeeper, and no broker has metadata for all of them
    
    In order to test behavior when brokers shut down, it was necessary to separate the Kafka test broker from Zookeeper so multiple brokers can run in a cluster, which is the reason for modifications in many tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-1682

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1340.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1340
    
----
commit 78a3d0683d1bfbfec7bbc7425927347b4b47154d
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Date:   2016-04-12T18:24:49Z

    STORM-1682: Refactor DynamicBrokersReader to lookup leader information via Kafka metadata API. Put Zookeeper metadata lookup into new class.

commit af72ce6229ec1332d7569846121e3b07b3b42b33
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Date:   2016-04-14T14:37:47Z

    STORM-1682: Add check of metadata request error code in DynamicBrokersReader

commit 430473be42f68d65aed471af9a808f5c7c599f39
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Date:   2016-04-14T15:38:09Z

    STORM-1682: Replace TopicCommand with AdminUtils, replace KafkaServerStartable with KafkaServer. The two former contain System.exit calls

commit 3d323392c1f809d6e829b641d0ecb1719865bde8
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Date:   2016-04-21T09:50:20Z

    Merge branch 'master' of https://github.com/apache/storm into STORM-1682

commit e9f3f860b5aecccd9c9fb9cc086fac1bb619f5bd
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Date:   2016-04-21T09:45:39Z

    STORM-1682: Fix NPE in DynamicBrokersReader when leader is missing/down

commit dc3c0fb73cb1074fd7d38d1ada9ef0c021957eb5
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Date:   2016-05-16T11:38:54Z

    STORM-1682: Adjust for Kafka 0.8.2.1

----


> Kafka spout can lose partitions
> -------------------------------
>
>                 Key: STORM-1682
>                 URL: https://issues.apache.org/jira/browse/STORM-1682
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 0.10.0, 1.0.0, 2.0.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>
> The KafkaSpout can lose partitions for a period, or hang because getBrokersInfo (https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java#L77) may get a NoNodeException if there is no broker info in Zookeeper corresponding to the leader id in Zookeeper. When this error occurs, the spout ignores the partition until the next time getBrokersInfo is called, which isn't until the next time the spout gets an exception on fetch. If the timing is really bad, it might ignore all the partitions and never restart.
> As far as I'm aware, Kafka doesn't update leader and brokerinfo atomically, so it's possible to get unlucky and hit the NoNodeException when a broker has just died.
> I have a few suggestions for dealing with this. 
> getBrokerInfo could simply retry the inner loop over partitions if it gets the NoNodeException (probably with a limit and a short sleep between attempts). If it fails repeatedly, the spout should be crashed.
> Alternatively the DynamicBrokersReader could instead lookup all brokers in Zookeeper, create a consumer and send a TopicMetadataRequest on it. The response contains the leader for each partition and host/port for the relevant brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)