You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/01/14 05:22:12 UTC

[jira] [Updated] (KAFKA-698) broker may expose uncommitted data to a consumer

     [ https://issues.apache.org/jira/browse/KAFKA-698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-698:
--------------------------

    Attachment: kafka-699.patch

Attach a patch. It removes replicaId from javaapi.FetchRequest and restricts the scope of the constructor in scala FetchRequest that sets replicaId.
                
> broker may expose uncommitted data to a consumer
> ------------------------------------------------
>
>                 Key: KAFKA-698
>                 URL: https://issues.apache.org/jira/browse/KAFKA-698
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Priority: Blocker
>
> We saw the following error in the log during testing. The problem seems to be that when the high watermark was at offset 39021, the broker incorrectly exposed an uncommitted message (at offset 39022) to the client. This doesn't always happen, but can happen when certain conditions are met, which I should explain in the comments.
> 2013/01/11 00:54:42.059 ERROR [KafkaApis] [kafka-request-handler-2] [kafka] []  [KafkaApi-277] error when processing request (service_metrics,2,39022,2000000)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset (39021) less than the start offset (39022).
>         at kafka.log.LogSegment.read(LogSegment.scala:105)
>         at kafka.log.Log.read(Log.scala:386)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
>         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
>         at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
>         at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
>         at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:186)
>         at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:185)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira