You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Punit Tiwan <ti...@coviam.com> on 2017/02/03 19:46:52 UTC

Issue with storm-kafka-client in replaying tuples

Hi,

I am using jar of storm-kafka-client by building the package since i want support of kafka 0.10.1.1. I am testing how my system will behave if one of the worker gets killed while my topology is running.

Configuration used:
topology where we have storm-kafka-client as spout and another bolt to process those messages.
Topology runs on two workers(say W1 and W2). Spout is running on W1 and bolt is running on W2.
Tuple timeout is 30secs.

For testing purpose, Storm-kafka-client's max_uncommitted_offset has been set to 40 and max_spout_pending in storm has been set to 20.

As we start topology, spout starts reading messages from kafka and forward it to bolt. As my bolt start processing the tuple, i killed W2 purposely to simulate crash. As W2 restarts, spout keeps reading messages from kafka and emit to storm. So when W2 comes up, there are certain messages that got lost(E.g. from tuple 2-5 got lost). Now as next tuples (6 onwards) reach to bolt, bolt acknowledge them.
Before we get a failure of lost tuples, spout reaches its max_uncommitted_offset and keep printing log that it reaches maximum uncommitted limit and won’t read any new message from kafka.
Now even if it gets a failure of tuple from storm, it never replay them again. Also spout won’t read new message from kafka.
This is a big issue for us because my topology is stuck till i restart it.

Another issue is that although i kept max_spout_pending to half of the value of max_uncommitted_offset, storm-kafka-client got records of number equal to  max_uncommitted_offset instead of max_spout_pending. 

Can anyone please help me with this.

PS: While debugging we found that if number of uncommitted offset becomes equal or greater to max uncommitted offset then spout stops polling kafka and in poll method only it ask retry service whether there are any messages to replay by using method “doSeekRetriableTopicPartitions”.

Regards,
Punit Tiwan




Re: Issue with storm-kafka-client in replaying tuples

Posted by Stig Rohde Døssing <st...@gmail.com>.
Hi Punit,

I took a look at the nextTuple logic, and I think you're right.

It doesn't look like the spout is actually respecting maxUncommittedOffsets
in all cases. If the underlying consumer returns more records in a call to
poll() than maxUncommittedOffsets, they will all be added to waitingToEmit.
Since poll may return up to 500 records by default, this is pretty likely
to happen with low maxUncommittedOffsets.

I think maxSpoutPending is not effective because there was a recent
optimization where the spout will emit as many tuples as possible in one
call to nextTuple() (it emits all records in waitingToEmit that aren't
already acked, currently emitted or waiting for retry backoff to expire).
This probably prevents Storm from moderating the number of emitted tuples.

The spout only checks for tuples to retry if it decides to poll, and it
only decides to poll if numUncommittedOffsets < maxUncommittedOffsets.
Since maxUncommittedOffsets isn't being respected when retrieving or
emitting records, numUncommittedOffsets can be much larger than
maxUncommittedOffsets. If more than maxUncommittedOffsets messages fail,
this can cause the spout to stop polling entirely.

I've filed a JIRA here https://issues.apache.org/jira/browse/STORM-2343,
and will take a look at getting it fixed soon. Thanks for pointing out this
bad behavior :)

2017-02-03 20:46 GMT+01:00 Punit Tiwan <ti...@coviam.com>:

> Hi,
>
> I am using jar of storm-kafka-client by building the package since i want
> support of kafka 0.10.1.1. I am testing how my system will behave if one of
> the worker gets killed while my topology is running.
>
> Configuration used:
> topology where we have storm-kafka-client as spout and another bolt to
> process those messages.
> Topology runs on two workers(say W1 and W2). Spout is running on W1 and
> bolt is running on W2.
> Tuple timeout is 30secs.
>
> For testing purpose, Storm-kafka-client's max_uncommitted_offset has been
> set to 40 and max_spout_pending in storm has been set to 20.
>
> As we start topology, spout starts reading messages from kafka and forward
> it to bolt. As my bolt start processing the tuple, i killed W2 purposely to
> simulate crash. As W2 restarts, spout keeps reading messages from kafka and
> emit to storm. So when W2 comes up, there are certain messages that got
> lost(E.g. from tuple 2-5 got lost). Now as next tuples (6 onwards) reach to
> bolt, bolt acknowledge them.
> Before we get a failure of lost tuples, spout reaches its
> max_uncommitted_offset and keep printing log that it reaches maximum
> uncommitted limit and won’t read any new message from kafka.
> Now even if it gets a failure of tuple from storm, it never replay them
> again. Also spout won’t read new message from kafka.
> This is a big issue for us because my topology is stuck till i restart it.
>
> Another issue is that although i kept max_spout_pending to half of the
> value of max_uncommitted_offset, storm-kafka-client got records of number
> equal to  max_uncommitted_offset instead of max_spout_pending.
>
> Can anyone please help me with this.
>
> PS: While debugging we found that if number of uncommitted offset becomes
> equal or greater to max uncommitted offset then spout stops polling kafka
> and in poll method only it ask retry service whether there are any messages
> to replay by using method “doSeekRetriableTopicPartitions”.
>
> Regards,
> Punit Tiwan
>
>
>
>