You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Mitchell Rathbun (BLOOMBERG/ 731 LEX)" <mr...@bloomberg.net> on 2019/11/05 21:58:44 UTC

FirstPollOffsetStrategy LATEST not working as expected

We have a topology that has 4 instances of KafkaSpout running as part of the same consumer group. Currently, we are only running one worker process per topology, so all of the spout instances are in the same process. We have noticed that when a process crashes/is killed, the KafkaSpout picks up from the last committed offset when it restarts, not the most recent offset. This is supported when looking in the code: https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L255 If isOffsetCommittedByThisTopology is true, then the KafkaConsumer seeks to the last committed offset. To me, this is behaving like UNCOMMITTED_LATEST should. I guess the question is why does isOffsetCommittedByThisTopology return true if the topology just crashed. Shouldn't the new topology instance be treated separately from the old one?

Re: FirstPollOffsetStrategy LATEST not working as expected

Posted by Stig Rohde Døssing <st...@gmail.com>.
No, not for a while now. See
https://github.com/apache/storm/blob/v2.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java#L30.
The spout only ignores the committed offset once you redeploy the topology.
As your worker is just crashing, the topology hasn't changed, it's just a
recovering worker in the same topology.

The idea was for UNCOMMITTED_LATEST to allow you to start at latest, and
then have the committed offset persist if you redeploy. The idea with
LATEST was to start at latest, and then ignore the committed offset and
start at latest again if you redeploy. If you need to restart at latest on
crash as well, that should be a new setting.

Den tir. 5. nov. 2019 kl. 22.58 skrev Mitchell Rathbun (BLOOMBERG/ 731 LEX)
<mr...@bloomberg.net>:

> We have a topology that has 4 instances of KafkaSpout running as part of
> the same consumer group. Currently, we are only running one worker process
> per topology, so all of the spout instances are in the same process. We
> have noticed that when a process crashes/is killed, the KafkaSpout picks up
> from the last committed offset when it restarts, not the most recent
> offset. This is supported when looking in the code:
> https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L255
> If isOffsetCommittedByThisTopology is true, then the KafkaConsumer seeks to
> the last committed offset. To me, this is behaving like UNCOMMITTED_LATEST
> should. I guess the question is why does isOffsetCommittedByThisTopology
> return true if the topology just crashed. Shouldn't the new topology
> instance be treated separately from the old one?
>