You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/10/16 09:14:09 UTC

[kafka] branch 2.7 updated: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch (#9434)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new d4b79be  MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch (#9434)
d4b79be is described below

commit d4b79be65061629ead030c73b1f66fa3f4eb4d72
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Oct 16 09:58:01 2020 +0100

    MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch (#9434)
    
    In 2.7, we added lastFetchedEpoch to fetch requests and divergingEpoch to fetch responses. We are not using these for truncation yet, but in order to use these for truncation with IBP 2.7 onwards in the next release, we should make sure that we handle these in all the supporting classes even in 2.7.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../src/main/scala/kafka/server/DelayedFetch.scala | 15 +++++
 .../src/main/scala/kafka/server/FetchSession.scala | 12 ++--
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 +-
 .../kafka/server/DelayedFetchTest.scala            | 66 ++++++++++++++++++----
 .../scala/unit/kafka/server/FetchSessionTest.scala | 61 ++++++++++++++++++++
 5 files changed, 144 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index f34b8e7..fb07077 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 
@@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long,
    * Case E: This broker is the leader, but the requested epoch is now fenced
    * Case F: The fetch offset locates not on the last segment of the log
    * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+   * Case H: A diverging epoch was found, return response to trigger truncation
    * Upon completion, should return whatever data is available for each valid partition
    */
   override def tryComplete(): Boolean = {
@@ -118,6 +120,19 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
+
+            // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
+            fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
+              val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
+              if (epochEndOffset.error != Errors.NONE || epochEndOffset.hasUndefinedEpochOrOffset) {
+                debug(s"Could not obtain last offset for leader epoch for partition $topicPartition, epochEndOffset=$epochEndOffset.")
+                return forceComplete()
+              } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
+                debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " +
+                  s"$topicPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
+                return forceComplete()
+              }
+            }
           }
         } catch {
           case _: NotLeaderOrFollowerException =>  // Case A or Case B
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index ce579a6..140fba0 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -77,7 +77,8 @@ class CachedPartition(val topic: String,
                       var highWatermark: Long,
                       var leaderEpoch: Optional[Integer],
                       var fetcherLogStartOffset: Long,
-                      var localLogStartOffset: Long)
+                      var localLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer])
     extends ImplicitLinkedHashCollection.Element {
 
   var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX
@@ -89,21 +90,21 @@ class CachedPartition(val topic: String,
   override def setPrev(prev: Int): Unit = this.cachedPrev = prev
 
   def this(topic: String, partition: Int) =
-    this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1)
+    this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer])
 
   def this(part: TopicPartition) =
     this(part.topic, part.partition)
 
   def this(part: TopicPartition, reqData: FetchRequest.PartitionData) =
     this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, -1,
-      reqData.currentLeaderEpoch, reqData.logStartOffset, -1)
+      reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch)
 
   def this(part: TopicPartition, reqData: FetchRequest.PartitionData,
            respData: FetchResponse.PartitionData[Records]) =
     this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark,
-      reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset)
+      reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch)
+  def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
   def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
     // Update our cached request parameters.
@@ -111,6 +112,7 @@ class CachedPartition(val topic: String,
     fetchOffset = reqData.fetchOffset
     fetcherLogStartOffset = reqData.logStartOffset
     leaderEpoch = reqData.currentLeaderEpoch
+    lastFetchedEpoch = reqData.lastFetchedEpoch
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9c7307b..b9487fe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1048,6 +1048,7 @@ class ReplicaManager(val config: KafkaConfig,
     // check if this fetch request can be satisfied right away
     var bytesReadable: Long = 0
     var errorReadingData = false
+    var hasDivergingEpoch = false
     val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
     logReadResults.foreach { case (topicPartition, logReadResult) =>
       brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
@@ -1055,6 +1056,8 @@ class ReplicaManager(val config: KafkaConfig,
 
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
+      if (logReadResult.divergingEpoch.nonEmpty)
+        hasDivergingEpoch = true
       bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
       logReadResultMap.put(topicPartition, logReadResult)
     }
@@ -1063,7 +1066,8 @@ class ReplicaManager(val config: KafkaConfig,
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
-    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
+    //                        5) we found a diverging epoch
+    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
         tp -> FetchPartitionData(
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 01ebfd1..5b9e056 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -20,11 +20,12 @@ import java.util.Optional
 
 import scala.collection.Seq
 import kafka.cluster.Partition
+import kafka.log.LogOffsetSnapshot
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest}
 import org.easymock.{EasyMock, EasyMockSupport}
 import org.junit.Test
 import org.junit.Assert._
@@ -70,7 +71,7 @@ class DelayedFetchTest extends EasyMockSupport {
         .andThrow(new FencedLeaderEpochException("Requested epoch has been fenced"))
     EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
 
-    expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH)
+    expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH)
 
     replayAll()
 
@@ -110,7 +111,7 @@ class DelayedFetchTest extends EasyMockSupport {
 
     EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
       .andThrow(new NotLeaderOrFollowerException(s"Replica for $topicPartition not available"))
-    expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER)
+    expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER)
     EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
 
     replayAll()
@@ -120,6 +121,51 @@ class DelayedFetchTest extends EasyMockSupport {
     assertTrue(fetchResultOpt.isDefined)
   }
 
+  @Test
+  def testDivergingEpoch(): Unit = {
+    val topicPartition = new TopicPartition("topic", 0)
+    val fetchOffset = 500L
+    val logStartOffset = 0L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val lastFetchedEpoch = Optional.of[Integer](9)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
+    val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      delayMs = 500,
+      fetchMetadata = fetchMetadata,
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      clientMetadata = None,
+      responseCallback = callback)
+
+    val partition: Partition = mock(classOf[Partition])
+    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)).andReturn(partition)
+    val endOffsetMetadata = LogOffsetMetadata(messageOffset = 500L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
+    EasyMock.expect(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .andReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
+    EasyMock.expect(partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOnlyFromLeader = false))
+      .andReturn(new EpochEndOffset(Errors.NONE, lastFetchedEpoch.get, fetchOffset - 1))
+    EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
+    expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NONE)
+    replayAll()
+
+    assertTrue(delayedFetch.tryComplete())
+    assertTrue(delayedFetch.isCompleted)
+    assertTrue(fetchResultOpt.isDefined)
+  }
+
   private def buildFetchMetadata(replicaId: Int,
                                  topicPartition: TopicPartition,
                                  fetchStatus: FetchPartitionStatus): FetchMetadata = {
@@ -133,10 +179,10 @@ class DelayedFetchTest extends EasyMockSupport {
       fetchPartitionStatus = Seq((topicPartition, fetchStatus)))
   }
 
-  private def expectReadFromReplicaWithError(replicaId: Int,
-                                             topicPartition: TopicPartition,
-                                             fetchPartitionData: FetchRequest.PartitionData,
-                                             error: Errors): Unit = {
+  private def expectReadFromReplica(replicaId: Int,
+                                    topicPartition: TopicPartition,
+                                    fetchPartitionData: FetchRequest.PartitionData,
+                                    error: Errors): Unit = {
     EasyMock.expect(replicaManager.readFromLocalLog(
       replicaId = replicaId,
       fetchOnlyFromLeader = true,
@@ -146,12 +192,12 @@ class DelayedFetchTest extends EasyMockSupport {
       readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
       clientMetadata = None,
       quota = replicaQuota))
-      .andReturn(Seq((topicPartition, buildReadResultWithError(error))))
+      .andReturn(Seq((topicPartition, buildReadResult(error))))
   }
 
-  private def buildReadResultWithError(error: Errors): LogReadResult = {
+  private def buildReadResult(error: Errors): LogReadResult = {
     LogReadResult(
-      exception = Some(error.exception),
+      exception = if (error != Errors.NONE) Some(error.exception) else None,
       info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
       divergingEpoch = None,
       highWatermark = -1L,
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index c876b90..5ef3105 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -189,6 +189,67 @@ class FetchSessionTest {
   }
 
   @Test
+  def testLastFetchedEpoch(): Unit = {
+    val time = new MockTime()
+    val cache = new FetchSessionCache(10, 1000)
+    val fetchManager = new FetchManager(time, cache)
+
+    val tp0 = new TopicPartition("foo", 0)
+    val tp1 = new TopicPartition("foo", 1)
+    val tp2 = new TopicPartition("bar", 1)
+
+    def cachedLeaderEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = {
+      val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]]
+      context.foreachPartition((tp, data) => mapBuilder += tp -> data.currentLeaderEpoch)
+      mapBuilder.result()
+    }
+
+    def cachedLastFetchedEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = {
+      val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]]
+      context.foreachPartition((tp, data) => mapBuilder += tp -> data.lastFetchedEpoch)
+      mapBuilder.result()
+    }
+
+    val request1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+    request1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty[Integer], Optional.empty[Integer]))
+    request1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1), Optional.empty[Integer]))
+    request1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(2), Optional.of(1)))
+
+    val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, request1, EMPTY_PART_LIST, false)
+    assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)),
+      cachedLeaderEpochs(context1))
+    assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)),
+      cachedLastFetchedEpochs(context1))
+
+    val response = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+    response.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
+    response.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 10, 10, 10, null, null))
+    response.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 5, 5, 5, null, null))
+
+    val sessionId = context1.updateAndGenerateResponseData(response).sessionId()
+
+    // With no changes, the cached epochs should remain the same
+    val request2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+    val context2 = fetchManager.newContext(new JFetchMetadata(sessionId, 1), request2, EMPTY_PART_LIST, false)
+    assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)), cachedLeaderEpochs(context2))
+    assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)),
+      cachedLastFetchedEpochs(context2))
+    context2.updateAndGenerateResponseData(response).sessionId()
+
+    // Now verify we can change the leader epoch and the context is updated
+    val request3 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+    request3.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.of(6), Optional.of(5)))
+    request3.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.empty[Integer], Optional.empty[Integer]))
+    request3.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(3), Optional.of(3)))
+
+    val context3 = fetchManager.newContext(new JFetchMetadata(sessionId, 2), request3, EMPTY_PART_LIST, false)
+    assertEquals(Map(tp0 -> Optional.of(6), tp1 -> Optional.empty, tp2 -> Optional.of(3)),
+      cachedLeaderEpochs(context3))
+    assertEquals(Map(tp0 -> Optional.of(5), tp1 -> Optional.empty, tp2 -> Optional.of(3)),
+      cachedLastFetchedEpochs(context2))
+  }
+
+  @Test
   def testFetchRequests(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)