You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Petr Plavjaník (JIRA)" <ji...@apache.org> on 2017/05/02 13:34:04 UTC

[jira] [Updated] (KAFKA-5155) Messages can be deleted prematurely when some producers use timestamps and some not

     [ https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Petr Plavjaník updated KAFKA-5155:
----------------------------------
    Description: 
Some messages can be deleted prematurely and never read in following scenario. A producer uses timestamps and produces messages that are appended to the beginning of a log segment. Other producer produces messages without a timestamp. In that case the largest timestamp is made by the old messages with a timestamp and new messages with the timestamp does not influence and the log segment with old and new messages can be delete immediately after the last new message with no timestamp is appended. When all appended messages have no timestamp, then they are not deleted because {{lastModified}} attribute of a {{LogSegment}} is used.

New test case to {{kafka.log.LogTest}} that fails:
{code}
  @Test
  def shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
    val retentionMs = 10000000
    val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
    val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, magicValue = 0)
    val log = createLog(set.sizeInBytes, retentionMs = retentionMs)

    // append some messages to create some segments
    log.append(old)
    for (_ <- 0 until 12)
      log.append(set)

    assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
  }
{code}

It can be prevented by using {{def largestTimestamp = Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using current timestamp when messages with timestamp {{-1}} are appended.

  was:
Some messages can be deleted prematurely and never read in following scenario. A producer uses timestamps and produces messages that are appended to the beginning of a log segment. Other producer produces messages without a timestamp. In that case the largest timestamp is made by the old messages with a timestamp and new messages with the timestamp does not influence and the log segment with old and new messages can be delete immediately after the last new message with no timestamp is appended. When all appended messages have no timestamp, then they are not deleted because {{lastModified}} attribute of a {{LogSegment}} is used.

New test case to {{kafka.log.LogTest}} that fails:
{code}
  @Test
  def shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
    val retentionMs = 10000000
    val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
    val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1)
    val log = createLog(set.sizeInBytes, retentionMs = retentionMs)

    // append some messages to create some segments
    log.append(old)
    for (_ <- 0 until 14)
      log.append(set)

    log.deleteOldSegments()
    assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
  }
{code}

It can be prevented by using {{def largestTimestamp = Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using current timestamp when messages with timestamp {{-1}} are appended.


> Messages can be deleted prematurely when some producers use timestamps and some not
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-5155
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5155
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.10.2.0
>            Reporter: Petr Plavjaník
>
> Some messages can be deleted prematurely and never read in following scenario. A producer uses timestamps and produces messages that are appended to the beginning of a log segment. Other producer produces messages without a timestamp. In that case the largest timestamp is made by the old messages with a timestamp and new messages with the timestamp does not influence and the log segment with old and new messages can be delete immediately after the last new message with no timestamp is appended. When all appended messages have no timestamp, then they are not deleted because {{lastModified}} attribute of a {{LogSegment}} is used.
> New test case to {{kafka.log.LogTest}} that fails:
> {code}
>   @Test
>   def shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() {
>     val retentionMs = 10000000
>     val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0)
>     val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, magicValue = 0)
>     val log = createLog(set.sizeInBytes, retentionMs = retentionMs)
>     // append some messages to create some segments
>     log.append(old)
>     for (_ <- 0 until 12)
>       log.append(set)
>     assertEquals("No segment should be deleted", 0, log.deleteOldSegments())
>   }
> {code}
> It can be prevented by using {{def largestTimestamp = Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using current timestamp when messages with timestamp {{-1}} are appended.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)