You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/02/15 08:09:28 UTC

[kafka] branch 3.1 updated: KAFKA-14704; Follower should truncate before incrementing high watermark (#13245)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new a970b91d784 KAFKA-14704; Follower should truncate before incrementing high watermark (#13245)
a970b91d784 is described below

commit a970b91d784c55719f67981b1703bed979ca21ce
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Feb 15 08:39:07 2023 +0100

    KAFKA-14704; Follower should truncate before incrementing high watermark (#13245)
    
    When a leader becomes a follower, it is likely that it has uncommitted records in its log. When it reaches out to the leader, the leader will detect that they have diverged and it will return the diverging epoch and offset. The follower truncates it log based on this.
    
    There is a small caveat in this process. When the leader return the diverging epoch and offset, it also includes its high watermark, low watermark, start offset and end offset. The current code in the `AbstractFetcherThread` works as follow. First it process the partition data and then it checks whether there is a diverging epoch/offset. The former may accidentally expose uncommitted records as this step updates the local watermark to whatever is received from the leader. As the follo [...]
    
    When this happens, the follower logs the following messages:
    * `Truncating XXX to offset 21434 below high watermark 21437`
    * `Non-monotonic update of high watermark from (offset=21437 segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.
    
    This patch proposes to mitigate the issue by starting by checking on whether a diverging epoch/offset is provided by the leader and skip processing the partition data if it is. This basically means that the first fetch request will result in truncating the log and a subsequent fetch request will update the low/high watermarks.
    
    Reviewers: Ritika Reddy <rr...@confluent.io>, Justine Olshan <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 60 ++++++++--------
 .../kafka/server/AbstractFetcherThreadTest.scala   | 79 ++++++++++++++++++++--
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 77 +++++++++++++++++++++
 3 files changed, 182 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 492cec425e3..fe4808bcfe6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -344,33 +344,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()
+                        .setPartition(topicPartition.partition)
+                        .setErrorCode(Errors.NONE.code)
+                        .setLeaderEpoch(partitionData.divergingEpoch.epoch)
+                        .setEndOffset(partitionData.divergingEpoch.endOffset)
+                    } else {
+                      // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+                      val logAppendInfoOpt = processPartitionData(
+                        topicPartition,
+                        currentFetchState.fetchOffset,
+                        partitionData
+                      )
+
+                      logAppendInfoOpt.foreach { logAppendInfo =>
+                        val validBytes = logAppendInfo.validBytes
+                        val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
+                        val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
+                        fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
+
+                        // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
+                        if (validBytes > 0 && partitionStates.contains(topicPartition)) {
+                          // Update partitionStates only if there is no exception during processPartitionData
+                          val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
+                            currentFetchState.currentLeaderEpoch, state = Fetching,
+                            logAppendInfo.lastLeaderEpoch)
+                          partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
+                          fetcherStats.byteRate.mark(validBytes)
+                        }
                       }
                     }
                   } catch {
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 148a903187b..fe2421b8285 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -968,6 +968,76 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = {
+    assumeTrue(truncateOnFetch)
+
+    val partition = new TopicPartition("topic", 0)
+
+    var truncateCalls = 0
+    var processPartitionDataCalls = 0
+    val fetcher = new MockFetcherThread {
+      override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+        processPartitionDataCalls += 1
+        super.processPartitionData(topicPartition, fetchOffset, partitionData)
+      }
+
+      override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
+        truncateCalls += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
+      mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
+    )
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 1L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+    assertEquals(6L, replicaState.logEndOffset)
+    fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+    )
+
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 4L)
+    fetcher.setLeaderState(partition, leaderState)
+
+    // The first fetch should result in truncating the follower's log and
+    // it should not process the data hence not update the high watermarks.
+    fetcher.doWork()
+
+    assertEquals(1, truncateCalls)
+    assertEquals(0, processPartitionDataCalls)
+    assertEquals(3L, replicaState.logEndOffset)
+    assertEquals(1L, replicaState.highWatermark)
+
+    // Truncate should have been called only once and process partition data
+    // should have been called at least once. The log end offset and the high
+    // watermark are updated.
+    TestUtils.waitUntilTrue(() => {
+      fetcher.doWork()
+      fetcher.replicaPartitionState(partition).log == fetcher.leaderPartitionState(partition).log
+    }, "Failed to reconcile leader and follower logs")
+    fetcher.verifyLastFetchedEpoch(partition, Some(5))
+
+    assertEquals(1, truncateCalls)
+    assertTrue(processPartitionDataCalls >= 1)
+    assertEquals(5L, replicaState.logEndOffset)
+    assertEquals(4L, replicaState.highWatermark)
+  }
+
   @Test
   def testMaybeUpdateTopicIds(): Unit = {
     val partition = new TopicPartition("topic1", 0)
@@ -1059,13 +1129,8 @@ class AbstractFetcherThreadTest {
       val state = replicaPartitionState(topicPartition)
 
       if (isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
-        val divergingEpoch = partitionData.divergingEpoch
-        truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
-          .setPartition(topicPartition.partition)
-          .setErrorCode(Errors.NONE.code)
-          .setLeaderEpoch(divergingEpoch.epoch)
-          .setEndOffset(divergingEpoch.endOffset)))
-        return None
+        throw new IllegalStateException("processPartitionData should not be called for a partition with " +
+          "a diverging epoch.")
       }
 
       // Throw exception if the fetchOffset does not match the fetcherThread partition state
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index bbd9330727a..669923a0cca 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -620,6 +620,83 @@ class ReplicaFetcherThreadTest {
     partitions.foreach { tp => assertEquals(Fetching, thread.fetchState(tp).get.state) }
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = mock(classOf[LogManager])
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    val partition: Partition = mock(classOf[Partition])
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
+    val logEndOffset = 150
+    val highWatermark = 130
+
+    expect(log.highWatermark).andReturn(highWatermark).anyTimes()
+    expect(log.latestEpoch).andReturn(Some(5)).anyTimes()
+    expect(log.endOffsetForEpoch(4)).andReturn(Some(OffsetAndEpoch(149, 4))).anyTimes()
+    expect(log.logEndOffset).andReturn(logEndOffset).anyTimes()
+    expect(log.logStartOffset).andReturn(0).anyTimes()
+
+    expect(replicaManager.metadataCache).andReturn(metadataCache).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    expect(replicaManager.brokerTopicStats).andReturn(new BrokerTopicStats).anyTimes()
+
+    expect(replicaManager.localLogOrException(t1p0)).andReturn(log).anyTimes()
+    expect(replicaManager.getPartitionOrException(t1p0)).andReturn(partition).anyTimes()
+
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(partition.appendRecordsToFollowerOrFutureReplica(anyObject(), anyObject())).andReturn(None).anyTimes()
+    expect(partition.truncateTo(140, false)).once()
+
+    expect(quota.isThrottled(anyObject())).andReturn(false).anyTimes()
+
+    replay(replicaManager, logManager, quota, partition, log)
+
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(
+      Collections.emptyMap(),
+      brokerEndPoint,
+      new SystemTime()
+    )
+
+    val thread = new ReplicaFetcherThread(
+      "fetcher-thread",
+      0,
+      brokerEndPoint,
+      config,
+      failedPartitions,
+      replicaManager,
+      new Metrics(),
+      new SystemTime(),
+      quota,
+      Some(mockNetwork)
+    )
+
+    thread.addPartitions(Map(
+      t1p0 -> initialFetchState(Some(topicId1), logEndOffset))
+    )
+
+    // Prepare the fetch response data.
+    mockNetwork.setFetchPartitionDataForNextResponse(Map(
+      t1p0 -> new FetchResponseData.PartitionData()
+        .setPartitionIndex(t1p0.partition)
+        .setLastStableOffset(0)
+        .setLogStartOffset(0)
+        .setHighWatermark(160) // HWM is higher on the leader.
+        .setDivergingEpoch(new FetchResponseData.EpochEndOffset()
+          .setEpoch(4)
+          .setEndOffset(140))
+    ))
+    mockNetwork.setIdsForNextResponse(topicIds)
+
+    // Sends the fetch request and processes the response. This should truncate the
+    // log but it should not update the high watermark.
+    thread.doWork()
+
+    assertEquals(1, mockNetwork.fetchCount)
+    verify(partition, log)
+  }
+
   @Test
   def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {