You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2016/05/05 04:07:12 UTC

[jira] [Reopened] (KAFKA-725) Broker Exception: Attempt to read with a maximum offset less than start offset

     [ https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao reopened KAFKA-725:
---------------------------

Reopen this jira since the fix exposes a new issue. When the leader switches (say due to leader balancing), the new leader's HW can actually be smaller than the previous leader's HW since HW is propagated asynchronously. The new leader's log end offset is >= than the previous leader's HW and eventually its HW will move to its log end offset. Before that happens, if a consumer fetches data using previous leader's HW, with the patch, the consumer will get OffsetOutOfRangeException and thus has to reset the offset, which is bad. Without the patch, the consumer will get an empty response instead.

So, it seems that we should revert the changes in this patch.

> Broker Exception: Attempt to read with a maximum offset less than start offset
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-725
>                 URL: https://issues.apache.org/jira/browse/KAFKA-725
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Stig Rohde Døssing
>             Fix For: 0.10.0.0
>
>
> I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically:
> 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] []  [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732).
>         at kafka.log.LogSegment.read(LogSegment.scala:105)
>         at kafka.log.Log.read(Log.scala:390)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
>         at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
>         at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
>         at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
>         at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)
> When I shut the consumer down, I don't see the exceptions anymore.
> This is the code that my consumer is running:
>           while(true) {
>             // we believe the consumer to be connected, so try and use it for a fetch request
>             val request = new FetchRequestBuilder()
>               .addFetch(topic, partition, nextOffset, fetchSize)
>               .maxWait(Int.MaxValue)
>               // TODO for super high-throughput, might be worth waiting for more bytes
>               .minBytes(1)
>               .build
>             debug("Fetching messages for stream %s and offset %s." format (streamPartition, nextOffset))
>             val messages = connectedConsumer.fetch(request)
>             debug("Fetch complete for stream %s and offset %s. Got messages: %s" format (streamPartition, nextOffset, messages))
>             if (messages.hasError) {
>               warn("Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect." format (streamPartition, messages.errorCode(topic, partition)))
>               ErrorMapping.maybeThrowException(messages.errorCode(topic, partition))
>             }
>             messages.messageSet(topic, partition).foreach(msg => {
>               watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload))
>               nextOffset = msg.nextOffset
>             })
>           }
> Any idea what might be causing this error?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)