You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2017/08/10 17:43:42 UTC

[GitHub] storm issue #2174: STORM-2554: Trident Kafka Spout Refactoring to Include Ma...

Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2174
  
    I think we should consider if we can improve the way this spout implements the Trident API.
    
    The Trident API is expecting that the Coordinator figures out which partitions exist for a batch. The partitions are passed to the spout executors ("coordinatorMeta"), and it is expected that the Emitter filters that list to get the partitions assigned to itself. Please see https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L112.
    
    Deciding which partitions exist is the responsibility of the Coordinator, but this implementation puts that responsibility in the Emitter, which causes us to have to hack around with e.g. the enum instance or having to drop an emit because the partition is no longer assigned to this task. It will break if the scheduler happens to put the Coordinator in a different worker from any of the Emitter tasks. The Emitter code ends up being confusing, e.g. refreshPartitions does nothing, but logs based on the coordinatorMeta, which may be different from what the spout is actually assigned. I also think we make it easier to maintain OpaquePartitionedTridentSpoutExecutor if we don't have to keep in mind that the Kafka spout doesn't implement the API in the expected way.
    
    We already have the clean separation we need to implement the API as specified, but they're conflated in the Subscription interface.  The Coordinator should use PartitionFilter and its own KafkaConsumer instance (which we should add) to get the list of batch partitions instead of asking the KafkaTridentSpoutManager, so we get a nice decoupling from the Emitter. We should put the refresh subscription timer in the Coordinator as well. The Emitter should receive these partitions in getPartitionsForTask, use ManualPartitioner to decide which partitions are assigned to the task, and assign them on the consumer. 
    
    We'd need to change either KafkaSpoutConfig or Subscription a bit so we can get at the partitioner and filter classes, but I think it should be doable.
    
    Somewhat related: I noticed that OpaquePartitionedSpoutExecutor was changed in https://github.com/apache/storm/pull/1995 to fix this spout. It might be good to deprecate the getOrderedPartitions/refreshPartitions methods in 1.x so we can remove them from master. Right now the functionality seems duplicated on the Emitter interface, since getPartitionsForTask has the same purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---