You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Maciek Makowski (JIRA)" <ji...@apache.org> on 2015/12/29 15:30:49 UTC

[jira] [Updated] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

     [ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maciek Makowski updated KAFKA-3047:
-----------------------------------
    Description: 
{{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, should cause Kafka to use the offsets specified in the {{ByteBufferMessageSet}} and not recalculate them based on {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can cause corruption of the log in the following scenario:

* {{nextOffsetMetadata.messageOffset}} is 2001
* {{append(messageSet, assignOffsets = false)}} is called, where {{messageSet}} contains offsets 1001...1500 
* after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
* after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
* consistency check {{if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the second condition can never fail due to unconditional assignment) and writing proceeds
* the message set is appended to current log segment starting at offset 2001, but the offsets in the set are 1001...1500
* the system shuts down abruptly
* on restart, the following unrecoverable error is reported: 

{code}
Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to append an offset (1001) to position 12345 no larger than the last offset appended (1950) to xyz/00000000000000000000.index.
  at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
  at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
  at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
  at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
  at kafka.log.LogSegment.recover(LogSegment.scala:188)
  at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
  at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
  at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
  at kafka.log.Log.loadSegments(Log.scala:160)
  at kafka.log.Log.<init>(Log.scala:90)
  at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
  at java.util.concurrent.FutureTask.run(FutureTask.java:166)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  at java.lang.Thread.run(Thread.java:722)
{code} 

*Proposed fix:* the assignment {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} should only happen in {{if (assignOffsets)}} branch of code.

  was:
{{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, should cause Kafka to use the offsets specified in the {{ByteBufferMessageSet}} and not recalculate them based on {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can cause corruption of the log in the following scenario:

* {{nextOffsetMetadata.messageOffset}} is 2001
* {{append(messageSet, assignOffsets = false)}} is called, where {{messageSet}} contains offsets 1001...1500 
* after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
* after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
* consistency check {{if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the second condition can never fail due to unconditional assignment) and writing proceeds
* the message set is appended to current log segment starting at offset 2001, but the offsets in the set are 1001...1500
* the system shuts down abruptly
* on recovery, the following unrecoverable error is reported: 

{code}
Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to append an offset (1001) to position 12345 no larger than the last offset appended (1950) to xyz/00000000000000000000.index.
  at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
  at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
  at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
  at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
  at kafka.log.LogSegment.recover(LogSegment.scala:188)
  at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
  at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
  at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
  at kafka.log.Log.loadSegments(Log.scala:160)
  at kafka.log.Log.<init>(Log.scala:90)
  at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
  at java.util.concurrent.FutureTask.run(FutureTask.java:166)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  at java.lang.Thread.run(Thread.java:722)
{code} 

*Proposed fix:* the assignment {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} should only happen in {{if (assignOffsets)}} branch of code.


> Explicit offset assignment in Log.append can corrupt the log
> ------------------------------------------------------------
>
>                 Key: KAFKA-3047
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3047
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.9.0.0
>            Reporter: Maciek Makowski
>            Assignee: Jay Kreps
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, should cause Kafka to use the offsets specified in the {{ByteBufferMessageSet}} and not recalculate them based on {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the second condition can never fail due to unconditional assignment) and writing proceeds
> * the message set is appended to current log segment starting at offset 2001, but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to append an offset (1001) to position 12345 no larger than the last offset appended (1950) to xyz/00000000000000000000.index.
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.<init>(Log.scala:90)
>   at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} should only happen in {{if (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)