You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/02/04 19:30:07 UTC

[kafka] branch 2.5 updated: KAFKA-9491; Increment high watermark after full log truncation (#8037)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new c9abfd6  KAFKA-9491; Increment high watermark after full log truncation (#8037)
c9abfd6 is described below

commit c9abfd6a0d47309fd2af25bfc08eb0300ed38538
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Feb 4 11:22:33 2020 -0800

    KAFKA-9491; Increment high watermark after full log truncation (#8037)
    
    When a follower's fetch offset is behind the leader's log start offset, the
    follower will do a full log truncation. When it does so, it must update both
    its log start offset and high watermark. The previous code did the former,
    but not the latter. Failure to update the high watermark in this case can lead
    to out of range errors if the follower becomes leader before getting the latest
    high watermark from the previous leader. The out of range errors occur when
    we attempt to resolve the log position of the high watermark in DelayedFetch
    in order to determine if a fetch is satisfied.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>, Chia-Ping Tsai <ch...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/cluster/Partition.scala |  8 +++---
 core/src/main/scala/kafka/log/Log.scala           | 33 ++++++++++++++++-------
 core/src/test/scala/unit/kafka/log/LogTest.scala  | 20 +++++++++++---
 3 files changed, 43 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 156c098..9cfd99e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -490,8 +490,9 @@ class Partition(val topicPartition: TopicPartition,
 
       val leaderLog = localLogOrException
       val leaderEpochStartOffset = leaderLog.logEndOffset
-      info(s"$topicPartition starts at Leader Epoch ${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochStartOffset. Previous Leader Epoch was: $leaderEpoch")
+      info(s"$topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
+        s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark}. " +
+        s"Previous leader epoch was $leaderEpoch.")
 
       //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
       leaderEpoch = partitionState.leaderEpoch
@@ -506,12 +507,11 @@ class Partition(val topicPartition: TopicPartition,
       leaderLog.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
 
       val isNewLeader = !isLeader
-      val curLeaderLogEndOffset = leaderLog.logEndOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
       remoteReplicas.foreach { replica =>
         val lastCaughtUpTimeMs = if (inSyncReplicaIds.contains(replica.brokerId)) curTimeMs else 0L
-        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
+        replica.resetLastCaughtUpTime(leaderEpochStartOffset, curTimeMs, lastCaughtUpTimeMs)
       }
 
       if (isNewLeader) {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 3f1142e..c6bce27 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -301,7 +301,7 @@ class Log(@volatile var dir: File,
 
     leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
 
-    logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
+    updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset))
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
     leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
@@ -721,14 +721,30 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private def updateLogEndOffset(messageOffset: Long): Unit = {
-    nextOffsetMetadata = LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
+  private def updateLogEndOffset(offset: Long): Unit = {
+    nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
 
     // Update the high watermark in case it has gotten ahead of the log end offset following a truncation
     // or if a new segment has been rolled and the offset metadata needs to be updated.
-    if (highWatermark >= messageOffset) {
+    if (highWatermark >= offset) {
       updateHighWatermarkMetadata(nextOffsetMetadata)
     }
+
+    if (this.recoveryPoint > offset) {
+      this.recoveryPoint = offset
+    }
+  }
+
+  private def updateLogStartOffset(offset: Long): Unit = {
+    logStartOffset = offset
+
+    if (highWatermark < offset) {
+      updateHighWatermark(offset)
+    }
+
+    if (this.recoveryPoint < offset) {
+      this.recoveryPoint = offset
+    }
   }
 
   /**
@@ -1242,7 +1258,7 @@ class Log(@volatile var dir: File,
         checkIfMemoryMappedBufferClosed()
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset to $newLogStartOffset")
-          logStartOffset = newLogStartOffset
+          updateLogStartOffset(newLogStartOffset)
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
           producerStateManager.truncateHead(newLogStartOffset)
           maybeIncrementFirstUnstableOffset()
@@ -2046,8 +2062,7 @@ class Log(@volatile var dir: File,
             removeAndDeleteSegments(deletable, asyncDelete = true)
             activeSegment.truncateTo(targetOffset)
             updateLogEndOffset(targetOffset)
-            this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
-            this.logStartOffset = math.min(targetOffset, this.logStartOffset)
+            updateLogStartOffset(math.min(targetOffset, this.logStartOffset))
             leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
             loadProducerState(targetOffset, reloadFromCleanShutdown = false)
           }
@@ -2081,9 +2096,7 @@ class Log(@volatile var dir: File,
         producerStateManager.truncate()
         producerStateManager.updateMapEndOffset(newOffset)
         maybeIncrementFirstUnstableOffset()
-
-        this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
-        this.logStartOffset = newOffset
+        updateLogStartOffset(newOffset)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 26a7ffd..6bbedbb 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -117,12 +117,13 @@ class LogTest {
   def testHighWatermarkMaintenance(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
+    val leaderEpoch = 0
 
-    val records = TestUtils.records(List(
+    def records(offset: Long): MemoryRecords = TestUtils.records(List(
       new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
       new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
       new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ))
+    ), baseOffset = offset, partitionLeaderEpoch= leaderEpoch)
 
     def assertHighWatermark(offset: Long): Unit = {
       assertEquals(offset, log.highWatermark)
@@ -133,7 +134,7 @@ class LogTest {
     assertHighWatermark(0L)
 
     // High watermark not changed by append
-    log.appendAsLeader(records, leaderEpoch = 0)
+    log.appendAsLeader(records(0), leaderEpoch)
     assertHighWatermark(0L)
 
     // Update high watermark as leader
@@ -145,13 +146,24 @@ class LogTest {
     assertHighWatermark(3L)
 
     // Update high watermark as follower
-    log.appendAsLeader(records, leaderEpoch = 0)
+    log.appendAsFollower(records(3L))
     log.updateHighWatermark(6L)
     assertHighWatermark(6L)
 
     // High watermark should be adjusted by truncation
     log.truncateTo(3L)
     assertHighWatermark(3L)
+
+    log.appendAsLeader(records(0L), leaderEpoch = 0)
+    assertHighWatermark(3L)
+    assertEquals(6L, log.logEndOffset)
+    assertEquals(0L, log.logStartOffset)
+
+    // Full truncation should also reset high watermark
+    log.truncateFullyAndStartAt(4L)
+    assertEquals(4L, log.logEndOffset)
+    assertEquals(4L, log.logStartOffset)
+    assertHighWatermark(4L)
   }
 
   private def assertNonEmptyFetch(log: Log, offset: Long, isolation: FetchIsolation): Unit = {