You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/07/14 21:25:00 UTC
[jira] [Commented] (STORM-2624) Kafka Storm Spout: Got fetch
request with offset out of range
[ https://issues.apache.org/jira/browse/STORM-2624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088106#comment-16088106 ]
Stig Rohde Døssing commented on STORM-2624:
-------------------------------------------
Hi [~sergiyk]. Please open a PR at https://github.com/apache/storm, the code is more likely to get reviewed there, and the code can be validated via the CI setup. Thanks :)
> Kafka Storm Spout: Got fetch request with offset out of range
> -------------------------------------------------------------
>
> Key: STORM-2624
> URL: https://issues.apache.org/jira/browse/STORM-2624
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka
> Affects Versions: 1.0.1, 1.0.2, 1.1.0
> Reporter: Sergiy Kharytesku
>
> If partition offset is out of range then kafka spout stops emitting new messages and keeps logging following warning:
> 2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
> 2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
> ...
> I believe the trivial fix is in PartitonManager.java in fill method
> line 237:
> {code:java}
> long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime());
> if (partitionLatestOffset < offset) {
> offset = partitionLatestOffset;
> } else {
> offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
> }
> {code}
> change to:
> {code:java}
> offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, _spoutConfig.startOffsetTime);
> {code}
> line 259:
> {code:java}
> if (offset > _emittedToOffset) {
> _lostMessageCount.incrBy(offset - _emittedToOffset);
> _emittedToOffset = offset;
> LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> }
> {code}
> change to:
> {code:java}
> if (offset > _emittedToOffset) {
> _lostMessageCount.incrBy(offset - _emittedToOffset);
> }
> _emittedToOffset = offset;
> LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)