You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/02/14 09:00:45 UTC

[kafka] branch 3.3 updated: KAFKA-14704; Follower should truncate before incrementing high watermark (#13230)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new b8e01e24060 KAFKA-14704; Follower should truncate before incrementing high watermark (#13230)
b8e01e24060 is described below

commit b8e01e240605e305f55925802886c51d9b0c7edd
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Feb 14 09:54:32 2023 +0100

    KAFKA-14704; Follower should truncate before incrementing high watermark (#13230)
    
    When a leader becomes a follower, it is likely that it has uncommitted records in its log. When it reaches out to the leader, the leader will detect that they have diverged and it will return the diverging epoch and offset. The follower truncates it log based on this.
    
    There is a small caveat in this process. When the leader return the diverging epoch and offset, it also includes its high watermark, low watermark, start offset and end offset. The current code in the `AbstractFetcherThread` works as follow. First it process the partition data and then it checks whether there is a diverging epoch/offset. The former may accidentally expose uncommitted records as this step updates the local watermark to whatever is received from the leader. As the follo [...]
    
    When this happens, the follower logs the following messages:
    * `Truncating XXX to offset 21434 below high watermark 21437`
    * `Non-monotonic update of high watermark from (offset=21437 segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.
    
    This patch proposes to mitigate the issue by starting by checking on whether a diverging epoch/offset is provided by the leader and skip processing the partition data if it is. This basically means that the first fetch request will result in truncating the log and a subsequent fetch request will update the low/high watermarks.
    
    Reviewers: Ritika Reddy <rr...@confluent.io>, Justine Olshan <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 60 ++++++++--------
 .../kafka/server/AbstractFetcherThreadTest.scala   | 80 +++++++++++++++++++--
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 81 ++++++++++++++++++++++
 3 files changed, 187 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2ae3f45023b..bd3a490a5a6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -331,33 +331,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()
+                        .setPartition(topicPartition.partition)
+                        .setErrorCode(Errors.NONE.code)
+                        .setLeaderEpoch(partitionData.divergingEpoch.epoch)
+                        .setEndOffset(partitionData.divergingEpoch.endOffset)
+                    } else {
+                      // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+                      val logAppendInfoOpt = processPartitionData(
+                        topicPartition,
+                        currentFetchState.fetchOffset,
+                        partitionData
+                      )
+
+                      logAppendInfoOpt.foreach { logAppendInfo =>
+                        val validBytes = logAppendInfo.validBytes
+                        val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
+                        val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
+                        fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
+
+                        // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
+                        if (validBytes > 0 && partitionStates.contains(topicPartition)) {
+                          // Update partitionStates only if there is no exception during processPartitionData
+                          val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
+                            currentFetchState.currentLeaderEpoch, state = Fetching,
+                            logAppendInfo.lastLeaderEpoch)
+                          partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
+                          fetcherStats.byteRate.mark(validBytes)
+                        }
                       }
                     }
                   } catch {
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index cdd17b1af2c..9a64de06085 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -999,6 +999,77 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = {
+    assumeTrue(truncateOnFetch)
+
+    val partition = new TopicPartition("topic", 0)
+
+    var truncateCalls = 0
+    var processPartitionDataCalls = 0
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+      override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+        processPartitionDataCalls += 1
+        super.processPartitionData(topicPartition, fetchOffset, partitionData)
+      }
+
+      override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
+        truncateCalls += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
+      mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
+    )
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 1L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+    assertEquals(6L, replicaState.logEndOffset)
+    fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+    )
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 4L)
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    // The first fetch should result in truncating the follower's log and
+    // it should not process the data hence not update the high watermarks.
+    fetcher.doWork()
+
+    assertEquals(1, truncateCalls)
+    assertEquals(0, processPartitionDataCalls)
+    assertEquals(3L, replicaState.logEndOffset)
+    assertEquals(1L, replicaState.highWatermark)
+
+    // Truncate should have been called only once and process partition data
+    // should have been called at least once. The log end offset and the high
+    // watermark are updated.
+    TestUtils.waitUntilTrue(() => {
+      fetcher.doWork()
+      fetcher.replicaPartitionState(partition).log == fetcher.mockLeader.leaderPartitionState(partition).log
+    }, "Failed to reconcile leader and follower logs")
+    fetcher.verifyLastFetchedEpoch(partition, Some(5))
+
+    assertEquals(1, truncateCalls)
+    assertTrue(processPartitionDataCalls >= 1)
+    assertEquals(5L, replicaState.logEndOffset)
+    assertEquals(4L, replicaState.highWatermark)
+  }
+
   @Test
   def testMaybeUpdateTopicIds(): Unit = {
     val partition = new TopicPartition("topic1", 0)
@@ -1287,13 +1358,8 @@ class AbstractFetcherThreadTest {
       val state = replicaPartitionState(topicPartition)
 
       if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
-        val divergingEpoch = partitionData.divergingEpoch
-        truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
-          .setPartition(topicPartition.partition)
-          .setErrorCode(Errors.NONE.code)
-          .setLeaderEpoch(divergingEpoch.epoch)
-          .setEndOffset(divergingEpoch.endOffset)))
-        return None
+        throw new IllegalStateException("processPartitionData should not be called for a partition with " +
+          "a diverging epoch.")
       }
 
       // Throw exception if the fetchOffset does not match the fetcherThread partition state
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 8eeb22046e6..84f5d84926e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -659,6 +659,87 @@ class ReplicaFetcherThreadTest {
     partitions.foreach { tp => assertEquals(Fetching, thread.fetchState(tp).get.state) }
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = mock(classOf[LogManager])
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    val partition: Partition = mock(classOf[Partition])
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
+    val logEndOffset = 150
+    val highWatermark = 130
+
+    when(log.highWatermark).thenReturn(highWatermark)
+    when(log.latestEpoch).thenReturn(Some(5))
+    when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
+    when(log.logEndOffset).thenReturn(logEndOffset)
+
+    when(replicaManager.metadataCache).thenReturn(metadataCache)
+    when(replicaManager.logManager).thenReturn(logManager)
+
+    when(replicaManager.localLogOrException(t1p0)).thenReturn(log)
+    when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+
+    when(partition.localLogOrException).thenReturn(log)
+    when(partition.appendRecordsToFollowerOrFutureReplica(any(), any())).thenReturn(None)
+
+    val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
+
+    val mockNetwork = new MockBlockingSender(
+      Collections.emptyMap(),
+      brokerEndPoint,
+      new SystemTime()
+    )
+
+    val leader = new RemoteLeaderEndPoint(
+      logContext.logPrefix,
+      mockNetwork,
+      new FetchSessionHandler(logContext, brokerEndPoint.id),
+      config,
+      replicaManager,
+      quota,
+      () => config.interBrokerProtocolVersion
+    )
+
+    val thread = new ReplicaFetcherThread(
+      "fetcher-thread",
+      leader,
+      config,
+      failedPartitions,
+      replicaManager,
+      quota,
+      logContext.logPrefix,
+      () => config.interBrokerProtocolVersion
+    )
+
+    thread.addPartitions(Map(
+      t1p0 -> initialFetchState(Some(topicId1), logEndOffset))
+    )
+
+    // Prepare the fetch response data.
+    mockNetwork.setFetchPartitionDataForNextResponse(Map(
+      t1p0 -> new FetchResponseData.PartitionData()
+        .setPartitionIndex(t1p0.partition)
+        .setLastStableOffset(0)
+        .setLogStartOffset(0)
+        .setHighWatermark(160) // HWM is higher on the leader.
+        .setDivergingEpoch(new FetchResponseData.EpochEndOffset()
+          .setEpoch(4)
+          .setEndOffset(140))
+    ))
+    mockNetwork.setIdsForNextResponse(topicIds)
+
+    // Sends the fetch request and processes the response. This should truncate the
+    // log but it should not update the high watermark.
+    thread.doWork()
+
+    assertEquals(1, mockNetwork.fetchCount)
+    verify(partition, times(1)).truncateTo(140, false)
+    verify(log, times(0)).maybeUpdateHighWatermark(anyLong())
+  }
+
   @Test
   def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {