You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/07/14 21:25:00 UTC

[jira] [Commented] (STORM-2624) Kafka Storm Spout: Got fetch request with offset out of range

    [ https://issues.apache.org/jira/browse/STORM-2624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088106#comment-16088106 ] 

Stig Rohde Døssing commented on STORM-2624:
-------------------------------------------

Hi [~sergiyk]. Please open a PR at https://github.com/apache/storm, the code is more likely to get reviewed there, and the code can be validated via the CI setup. Thanks :)

> Kafka Storm Spout: Got fetch request with offset out of range
> -------------------------------------------------------------
>
>                 Key: STORM-2624
>                 URL: https://issues.apache.org/jira/browse/STORM-2624
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 1.0.1, 1.0.2, 1.1.0
>            Reporter: Sergiy Kharytesku
>
> If partition offset is out of range then kafka spout stops emitting new messages and keeps logging following warning:
> 2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
> 2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
> ...
> I believe the trivial fix is in PartitonManager.java in fill method 
> line 237:
> {code:java}
>             long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime());
>             if (partitionLatestOffset < offset) {
>                 offset = partitionLatestOffset;
>             } else {
>                 offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
>             }
> {code}
> change to:
> {code:java}
>             offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, _spoutConfig.startOffsetTime);
> {code}
> line 259:
> {code:java}
>             if (offset > _emittedToOffset) {
>                 _lostMessageCount.incrBy(offset - _emittedToOffset);
>                 _emittedToOffset = offset;
>                 LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
>             }
> {code}
> change to:
> {code:java}
>             if (offset > _emittedToOffset) {
>                 _lostMessageCount.incrBy(offset - _emittedToOffset);
>             }
>             _emittedToOffset = offset;
>             LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)