You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2016/04/01 15:15:25 UTC

[jira] [Commented] (KAFKA-725) Broker Exception: Attempt to read with a maximum offset less than start offset

    [ https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15221673#comment-15221673 ] 

Stig Rohde Døssing commented on KAFKA-725:
------------------------------------------

We're seeing this on 8.2.2. I'm not sure Log/LogSegment handles the high watermark as gracefully as they maybe could.

My guess at how it's happening:
Assume an ISR of at least 2.
A consumer (in our case the Storm KafkaSpout) reads up to the end of the committed log, say up to message 5. 
The leader for the relevant partition then receives one or more messages (6). 
Before the new message(s) are replicated, the consumer increments its offset and fetches (from 6). 
The leader receives the fetch, sets the maxOffset for read to the high watermark (5), and compares the end of the log to the requested offset (see https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/log/Log.scala#L482). This check passes because the end of the log is at 6.
When the read on LogSegment is reached, it will error out when the maxOffset is smaller than the start offset, which causes this error log. https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/log/LogSegment.scala#L146

Maybe the check in Log should include whether the maxOffset is larger than offset as well?

> Broker Exception: Attempt to read with a maximum offset less than start offset
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-725
>                 URL: https://issues.apache.org/jira/browse/KAFKA-725
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Jay Kreps
>
> I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically:
> 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] []  [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732).
>         at kafka.log.LogSegment.read(LogSegment.scala:105)
>         at kafka.log.Log.read(Log.scala:390)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
>         at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
>         at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
>         at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
>         at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)
> When I shut the consumer down, I don't see the exceptions anymore.
> This is the code that my consumer is running:
>           while(true) {
>             // we believe the consumer to be connected, so try and use it for a fetch request
>             val request = new FetchRequestBuilder()
>               .addFetch(topic, partition, nextOffset, fetchSize)
>               .maxWait(Int.MaxValue)
>               // TODO for super high-throughput, might be worth waiting for more bytes
>               .minBytes(1)
>               .build
>             debug("Fetching messages for stream %s and offset %s." format (streamPartition, nextOffset))
>             val messages = connectedConsumer.fetch(request)
>             debug("Fetch complete for stream %s and offset %s. Got messages: %s" format (streamPartition, nextOffset, messages))
>             if (messages.hasError) {
>               warn("Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect." format (streamPartition, messages.errorCode(topic, partition)))
>               ErrorMapping.maybeThrowException(messages.errorCode(topic, partition))
>             }
>             messages.messageSet(topic, partition).foreach(msg => {
>               watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload))
>               nextOffset = msg.nextOffset
>             })
>           }
> Any idea what might be causing this error?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)