You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2017/08/30 18:03:12 UTC

[GitHub] storm pull request #2300: STORM-2691: Make storm-kafka-client implement the ...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2300

    STORM-2691: Make storm-kafka-client implement the Trident interface correctly

    This PR is based on https://github.com/apache/storm/pull/2271, so please ignore the first commit. The broad thrust of this PR is to fix the storm-kafka-client Trident spout so it implements the Trident API as intended. The current implementation takes some shortcuts because it used to be necessary to support use of the KafkaConsumer.subscribe API, but it causes some issues listed below.
    
    * The changes in https://github.com/apache/storm/pull/2009 released in 1.1.0 made some changes to the OpaquePartitionedTridentSpoutExecutor that likely broke IOpaquePartitionedTridentSpout implementations other than storm-kafka-client. The changed code used to request sorted partitions from the spout via getOrderedPartitions, do a round-robin partitioning, and assign partitions via refreshPartitions https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100. The new code just passes the output of getOrderedPartitions into refreshPartitions https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120. It looks to me like refreshPartitions is passed the list of all partitions assigned to any spout task, rather than just the partitions assigned to the current task.
    
    The proposed fix will use getOrderedPartitions to get the sorted partitions list, pass the list into getPartitionsForTask, and pass the resulting list of assigned partitions back into refreshPartitions.
    
    * The current implementation of the Trident Kafka spout only has one consumer in the Emitter. The Coordinator needs to know which partitions are involved in a batch, and this information is shared via a static field the Emitter writes to. The Coordinator and Emitter are both running as regular bolts, so there's no guarantee that they're in the same JVM.
    
    The Subscription interface doesn't fit Trident very well. Trident assumes that there is a coordinator bolt that will determine which partitions should be involved in a batch, and the emitter bolts should partition the list emitted by the coordinator and each emit for only the partitions assigned to the relevant emitter. The Subscription interface only has a `subscribe()` method that does everything, which means we can't delegate parts of the subscription process to the coordinator. The current spout does a workaround that assumes that the coordinator bolt will be in the same worker as one or more emitters, and just makes the emitter handle everything while the coordinator reads partition information from a static field. This only works if the coordinator happens to be sharing a worker with the emitters, and causes issues where the coordinator emits a set of partitions, and the emitters happen to do a reassignment before the message arrives.
    
    The suggested fix is to split the subscription process into 4 steps that fit what Trident needs: Get all partitions for the spout tasks, sort the partitions, decide which partitions belong to this task and do the assignment. This allows us to get rid of the shared static field and split the subscription process across the coordinator and emitters. We'll lose a bit of flexibility in implementing Subscriptions, since we're now locking implementations to follow these steps, but I don't think it's a great loss.
    
    I don't think we can port this to 1.x without breaking the public API for the spout. I'd be okay with breaking the API in 1.2.0 unless anyone has an idea for a workaround.
    
    Other small changes:
    * The Trident spout was previously checking that all output stream fields were identical. As far as I can tell * Trident only supports one output stream, so check for that instead.
    * Removed an empty transactional spout implementation.
    * Remove TopicPartition from KafkaTridentSpoutBatchMetadata. It is unnecessary, the OpaquePartitionedTridentSpoutExecutor keeps track of which metadata belongs to which partition.
    * Upgrade Mockito to latest 1.x version for Rule support.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2691

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2300
    
----
commit 54a829ceba6c1575d0665721509889e4b60dd066
Author: Stig Rohde Døssing <sr...@apache.org>
Date:   2017-08-04T00:53:42Z

    STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta objects to Zookeeper

commit 2d1a9cb473d766b52dafa67115a087535939f7b0
Author: Stig Rohde Døssing <sr...@apache.org>
Date:   2017-08-18T20:13:38Z

    STORM-2691: storm-kafka-client Trident spout does not implement the Trident spout interface properly

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2300
  
    @srdo this change should go on https://github.com/apache/storm/pull/2174


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2300: STORM-2691: Make storm-kafka-client implement the ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2300


---

[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2300
  
    @srdo 
    Yeah I was aware that you talked offline but I couldn't get any indication that concern of the patch is resolved, since @hmcl talked about another issue at that time.
    
    Regarding breaking backward compatibility, if it is needed to fix "broken" thing, we would want to do that instead of leaving it as broken. Let's raise discussion thread and see there's objection about breaking it.


---

[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2300
  
    Rebased to fix conflicts. 


---

[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2300
  
    Thanks for the reviews.
    
    @HeartSaVioR Sorry, @hmcl and I talked offline a while ago. It's my impression that we're good.
    
    I'm not planning to port this to 1.x, since it requires breaking changes. I can't think of a way to fix this without replacing the Subscription interface.


---

[GitHub] storm issue #2300: STORM-2691: Make storm-kafka-client implement the Trident...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/2300
  
    +1


---