You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/09/17 18:32:04 UTC

[jira] [Commented] (STORM-643) KafkaUtils repeatedly fetches messages whose offset is out of range

    [ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14803176#comment-14803176 ] 

ASF GitHub Bot commented on STORM-643:
--------------------------------------

Github user prokopowicz commented on the pull request:

    https://github.com/apache/storm/pull/642#issuecomment-141141208
  
    we've seen a related problem when a custom scheme returns an empty list of lists via generateTuples.  The code in partitionManager does not ack the offset in that case.  It should be made clear that a scheme should not return an empty list of lists, but null, if it deserializes no objects.  Or, this code in PartitionManager should be fixed to treat an empty list of lists in the same way it treats a null list.
    
    ```java
                if (tups != null) {
                    if(_spoutConfig.topicAsStreamId) {
                        for (List<Object> tup : tups) {
                            collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
                        }
                    } else {
                        for (List<Object> tup : tups) {
                            collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                        }
                    }
                    break;
                } else {
                    ack(toEmit.offset);
                }
    ```


> KafkaUtils repeatedly fetches messages whose offset is out of range
> -------------------------------------------------------------------
>
>                 Key: STORM-643
>                 URL: https://issues.apache.org/jira/browse/STORM-643
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 0.9.2-incubating, 0.9.3, 0.10.0, 0.9.4, 0.9.5
>            Reporter: Xin Wang
>            Assignee: Xin Wang
>             Fix For: 0.9.6
>
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet<Long> failed) is not empty and some offset in it is OutOfRange.
> [worker-log]
> {code}
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717
> ...
> {code}
> [FIX]
> {code}
> storm.kafka.PartitionManager.fill():
> ...
> try {
> 	msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
> } catch (UpdateOffsetException e) {
> 	 _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
> 	LOG.warn("Using new offset: {}", _emittedToOffset);
> 	// fetch failed, so don't update the metrics
> 	//fix bug: remove this offset from failed list when it is OutOfRange
> 	if (had_failed) {
> 		failed.remove(offset);
> 	}
>             return;
> }
> ...
> {code}
> also: Log "retrying with default start offset time from configuration. configured start offset time: [-2]" is incorrect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)