You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/08/01 12:35:01 UTC

[jira] [Commented] (STORM-2625) KafkaSpout is not calculating uncommitted correctly

    [ https://issues.apache.org/jira/browse/STORM-2625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108816#comment-16108816 ] 

Stig Rohde Døssing commented on STORM-2625:
-------------------------------------------

[~GuangDu] please note that manual subscription doesn't work until version 1.2.0 (1.x-branch). If you use the spout from that branch it will use manual subscription by default.

> KafkaSpout is not calculating uncommitted correctly
> ---------------------------------------------------
>
>                 Key: STORM-2625
>                 URL: https://issues.apache.org/jira/browse/STORM-2625
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
>            Priority: Minor
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> This happens when:
> 1. KafkaSpout has already committed offsets to a topic before, and is not running/activated now;
> 2. There're messages in topic after the committed offsets;
> 3. The same consumer group topology with multi works is started/activated again;
> The same issue may happen when running topology gets consumer group partition re-assignment with offsets not being able to be committed in time.
> The underlying issue is:
> a. Because workers are registering kafka consumers one by one, when the first consumer A registers itself with kafka broker with the consumer group, it's assigned all the partitions, say partition 0 & 1. Consumer A then retrieves messages from all the assigned partitions if possible, and started processing. With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());
> b. At this point a second consumer B registers with the broker for the same consumer group. the broker then re-assigns the partitions among existing consumers, say consumer A is assigned partition 0, and consumer B assigned partition 1. 
> b.1 At this point KafkaSpout A will try committing acked offsets, and remove the partition 1 offsets it's tracking (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However because the tuples are not all acked, KafkaSpout is not able to commit full list of offsets to kafka broker.
> b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers as well as emitted (
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()), resulting the not acked tuples won't be acked for ever (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count numUncommittedOffsets will never be reduced back to a correct result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)