You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/28 04:19:09 UTC

[kafka] branch 2.0 updated: KAFKA-7104: More consistent leader's state in fetch response (#5305)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 3673ce8  KAFKA-7104: More consistent leader's state in fetch response (#5305)
3673ce8 is described below

commit 3673ce8757217fa64177d396a28dde268179bcd9
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Wed Jun 27 21:15:20 2018 -0700

    KAFKA-7104: More consistent leader's state in fetch response (#5305)
    
    Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in #3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset  [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/server/ReplicaManager.scala   | 17 +++++++----------
 .../scala/unit/kafka/server/ReplicaManagerTest.scala    |  4 +++-
 2 files changed, 10 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ed9559f..9658f1a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -90,11 +90,6 @@ case class LogReadResult(info: FetchDataInfo,
     case Some(e) => Errors.forException(e)
   }
 
-  def updateLeaderReplicaInfo(leaderReplica: Replica): LogReadResult =
-    copy(highWatermark = leaderReplica.highWatermark.messageOffset,
-      leaderLogStartOffset = leaderReplica.logStartOffset,
-      leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset)
-
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
@@ -1340,7 +1335,12 @@ class ReplicaManager(val config: KafkaConfig,
 
   /**
    * Update the follower's fetch state in the leader based on the last fetch request and update `readResult`,
-   * if necessary.
+   * if the follower replica is not recognized to be one of the assigned replicas. Do not update
+   * `readResult` otherwise, so that log start/end offset and high watermark is consistent with
+   * records in fetch response. Log start/end offset and high watermark may change not only due to
+   * this fetch request, e.g., rolling new log segment and removing old log segment may move log
+   * start offset further than the last offset in the fetched records. The followers will get the
+   * updated leader's state in the next fetch response.
    */
   private def updateFollowerLogReadResults(replicaId: Int,
                                            readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1351,10 +1351,7 @@ class ReplicaManager(val config: KafkaConfig,
         case Some(partition) =>
           partition.getReplica(replicaId) match {
             case Some(replica) =>
-              if (partition.updateReplicaLogReadResult(replica, readResult))
-                partition.leaderReplicaIfLocal.foreach { leaderReplica =>
-                  updatedReadResult = readResult.updateLeaderReplicaInfo(leaderReplica)
-                }
+              partition.updateReplicaLogReadResult(replica, readResult)
             case None =>
               warn(s"Leader $localBrokerId failed to record follower $replicaId's position " +
                 s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ce88688..56d4b79 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -463,7 +463,9 @@ class ReplicaManagerTest {
 
         val tp0Status = responseStatusMap.get(tp0)
         assertTrue(tp0Status.isDefined)
-        assertEquals(1, tp0Status.get.highWatermark)
+        // the response contains high watermark on the leader before it is updated based
+        // on this fetch request
+        assertEquals(0, tp0Status.get.highWatermark)
         assertEquals(None, tp0Status.get.lastStableOffset)
         assertEquals(Errors.NONE, tp0Status.get.error)
         assertTrue(tp0Status.get.records.batches.iterator.hasNext)