You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/11/17 22:43:32 UTC

[kafka] branch 2.4 updated: KAFKA-9196; Update high watermark metadata after segment roll (#7695)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 221d667  KAFKA-9196; Update high watermark metadata after segment roll (#7695)
221d667 is described below

commit 221d66774ced8f3d9360bcd138ff3a8553cf1919
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sun Nov 17 14:42:00 2019 -0800

    KAFKA-9196; Update high watermark metadata after segment roll (#7695)
    
    When we roll a new segment, the log offset metadata tied to the high watermark may
    need to be updated. This is needed when the high watermark is equal to the log end
    offset at the time of the roll. Otherwise, we risk exposing uncommitted data early.
    
    Reviewers: Dhruvil Shah <dh...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/Log.scala          |  8 ++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala | 36 ++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index bf0486a..6514aa2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -744,9 +744,9 @@ class Log(@volatile var dir: File,
   private def updateLogEndOffset(messageOffset: Long): Unit = {
     nextOffsetMetadata = LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
 
-    // Update the high watermark in case it has gotten ahead of the log end offset
-    // following a truncation.
-    if (highWatermark > messageOffset) {
+    // Update the high watermark in case it has gotten ahead of the log end offset following a truncation
+    // or if a new segment has been rolled and the offset metadata needs to be updated.
+    if (highWatermark >= messageOffset) {
       updateHighWatermarkMetadata(nextOffsetMetadata)
     }
   }
@@ -1912,9 +1912,11 @@ class Log(@volatile var dir: File,
           initFileSize = initFileSize,
           preallocate = config.preallocate)
         addSegment(segment)
+
         // We need to update the segment base offset and append position data of the metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(nextOffsetMetadata.messageOffset)
+
         // schedule an asynchronous flush of the old segment
         scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9a69e00..a948cf3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -78,6 +78,42 @@ class LogTest {
   }
 
   @Test
+  def testHighWatermarkMetadataUpdatedAfterSegmentRoll(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    def assertFetchSizeAndOffsets(fetchOffset: Long,
+                                  expectedSize: Int,
+                                  expectedOffsets: Seq[Long]): Unit = {
+      val readInfo = log.read(
+        startOffset = fetchOffset,
+        maxLength = 2048,
+        isolation = FetchHighWatermark,
+        minOneMessage = false)
+      assertEquals(expectedSize, readInfo.records.sizeInBytes)
+      assertEquals(expectedOffsets, readInfo.records.records.asScala.map(_.offset))
+    }
+
+    val records = TestUtils.records(List(
+      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
+      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
+      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
+    ))
+
+    log.appendAsLeader(records, leaderEpoch = 0)
+    assertFetchSizeAndOffsets(fetchOffset = 0L, 0, Seq())
+
+    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+    assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1, 2))
+
+    log.roll()
+    assertFetchSizeAndOffsets(fetchOffset = 0L, records.sizeInBytes, Seq(0, 1, 2))
+
+    log.appendAsLeader(records, leaderEpoch = 0)
+    assertFetchSizeAndOffsets(fetchOffset = 3L, 0, Seq())
+  }
+
+  @Test
   def testHighWatermarkMaintenance(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)