You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/02/03 23:52:51 UTC

[jira] [Created] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

Stig Rohde Døssing created STORM-2343:
-----------------------------------------

             Summary: New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once
                 Key: STORM-2343
                 URL: https://issues.apache.org/jira/browse/STORM-2343
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 2.0.0, 1.1.0
            Reporter: Stig Rohde Døssing
            Assignee: Stig Rohde Døssing


It doesn't look like the spout is respecting maxUncommittedOffsets in all cases. If the underlying consumer returns more records in a call to poll() than maxUncommittedOffsets, they will all be added to waitingToEmit. Since poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty likely to happen with low maxUncommittedOffsets.

I think maxSpoutPending is not effective because there was a recent optimization where the spout will emit as many tuples as possible in one call to nextTuple() (it emits all records in waitingToEmit that aren't already acked, currently emitted or waiting for retry backoff to expire). This probably prevents Storm from moderating the number of emitted tuples.

The spout only checks for tuples to retry if it decides to poll, and it only decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since maxUncommittedOffsets isn't being respected when retrieving or emitting records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. If more than maxUncommittedOffsets messages fail, this can cause the spout to stop polling entirely.

Also there seems to be cases where emit() will leave a record in waitingToEmit even though it should have been removed (e.g. an already acked record is present in waitingToEmit as the only record)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)