You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "SHRAVANKUMAR DUBBUDU (JIRA)" <ji...@apache.org> on 2016/12/15 16:15:58 UTC

[jira] [Closed] (STORM-2241) KafkaSpout implementaion

     [ https://issues.apache.org/jira/browse/STORM-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

SHRAVANKUMAR DUBBUDU closed STORM-2241.
---------------------------------------
    Resolution: Fixed

Spout implementation looks good. It process one tuple in each nextTuple() call

> KafkaSpout implementaion
> ------------------------
>
>                 Key: STORM-2241
>                 URL: https://issues.apache.org/jira/browse/STORM-2241
>             Project: Apache Storm
>          Issue Type: Question
>          Components: storm-kafka
>    Affects Versions: 0.10.0
>            Reporter: SHRAVANKUMAR DUBBUDU
>            Priority: Minor
>
> Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor of an ISpout does not need to worry about concurrency issues between those methods. However, it also means that an implementor must ensure that nextTuple is non-blocking: otherwise the method could block acks and fails that are pending to be processed.'
> Where as KafkaSpout has below nextTuple() implementation
> @Override
>     public void nextTuple() {
>         List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
>         for (int i = 0; i < managers.size(); i++) {
>             try {
>                 // in case the number of managers decreased
>                 _currPartitionIndex = _currPartitionIndex % managers.size();
>                 EmitState state = managers.get(_currPartitionIndex).next(_collector);
>                 if (state != EmitState.EMITTED_MORE_LEFT) {
>                     _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
>                 }
>                 if (state != EmitState.NO_EMITTED) {
>                     break;
>                 }
>             } catch (FailedFetchException e) {
>                 LOG.warn("Fetch failed", e);
>                 _coordinator.refresh();
>             }
>         }
>         long now = System.currentTimeMillis();
>         if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
>             commit();
>         }
>     }
> We are seeing events are getting replayed when there is slower bolt in the topology chain causing duplicate messages.
> Is there any way this can be fixed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)