You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/11/13 05:27:20 UTC

[kafka] branch 2.4 updated: MINOR: refactor replica last sent HW updates due to performance regression (#7671)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 577d235  MINOR: refactor replica last sent HW updates due to performance regression (#7671)
577d235 is described below

commit 577d235e11e2cc23020dd2b9bfd1fccbbf0214da
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Tue Nov 12 21:21:18 2019 -0800

    MINOR: refactor replica last sent HW updates due to performance regression (#7671)
    
    This change fixes a performance regression due to follower last seen highwatermark
    handling introduced in 23beeea. maybeUpdateHwAndSendResponse is expensive for
    brokers with high partition counts, as it requires a partition and a replica lookup for every
    partition being fetched. This refactor moves the last seen watermark update into the follower
    fetch state update where we have already looked up the partition and replica.
    
    I've seen cases where maybeUpdateHwAndSendResponse is responsible 8% of CPU usage, not including the responseCallback call that is part of it.
    
    I have benchmarked this change with `UpdateFollowerFetchStateBenchmark` and it adds 5ns
    of overhead to Partition.updateFollowerFetchState, which is a rounding error compared to the
    current overhead of maybeUpdateHwAndSendResponse.
    
    Reviewers: David Arthur <mu...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 11 ++++----
 core/src/main/scala/kafka/cluster/Replica.scala    |  6 +++--
 .../main/scala/kafka/server/ReplicaManager.scala   | 30 +++++-----------------
 .../kafka/server/DelayedFetchTest.scala            |  3 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 30 ++++++++++++++--------
 .../unit/kafka/server/IsrExpirationTest.scala      | 20 ++++++++++-----
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  3 ++-
 .../UpdateFollowerFetchStateBenchmark.java         |  8 +++---
 8 files changed, 57 insertions(+), 54 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5372258..bbbf0ca 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -539,9 +539,8 @@ class Partition(val topicPartition: TopicPartition,
             followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
             followerStartOffset = Log.UnknownOffset,
             followerFetchTimeMs = 0L,
-            leaderEndOffset = Log.UnknownOffset
-          )
-          replica.updateLastSentHighWatermark(0L)
+            leaderEndOffset = Log.UnknownOffset,
+            lastSentHighwatermark = 0L)
         }
       }
       // we may need to increment high watermark since ISR could be down to 1
@@ -599,7 +598,8 @@ class Partition(val topicPartition: TopicPartition,
                                followerFetchOffsetMetadata: LogOffsetMetadata,
                                followerStartOffset: Long,
                                followerFetchTimeMs: Long,
-                               leaderEndOffset: Long): Boolean = {
+                               leaderEndOffset: Long,
+                               lastSentHighwatermark: Long): Boolean = {
     getReplica(followerId) match {
       case Some(followerReplica) =>
         // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
@@ -609,7 +609,8 @@ class Partition(val topicPartition: TopicPartition,
           followerFetchOffsetMetadata,
           followerStartOffset,
           followerFetchTimeMs,
-          leaderEndOffset)
+          leaderEndOffset,
+          lastSentHighwatermark)
 
         val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
         // check if the LW of the partition has incremented
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 5504db5..f9de7ba 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -71,7 +71,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
   def updateFetchState(followerFetchOffsetMetadata: LogOffsetMetadata,
                        followerStartOffset: Long,
                        followerFetchTimeMs: Long,
-                       leaderEndOffset: Long): Unit = {
+                       leaderEndOffset: Long,
+                       lastSentHighwatermark: Long): Unit = {
     if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset)
       _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, followerFetchTimeMs)
     else if (followerFetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
@@ -81,6 +82,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
     _logEndOffsetMetadata = followerFetchOffsetMetadata
     lastFetchLeaderLogEndOffset = leaderEndOffset
     lastFetchTimeMs = followerFetchTimeMs
+    updateLastSentHighWatermark(lastSentHighwatermark)
     trace(s"Updated state of replica to $this")
   }
 
@@ -92,7 +94,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
     * When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately
     * in order to propagate the HW more expeditiously. See KIP-392
     */
-  def updateLastSentHighWatermark(highWatermark: Long): Unit = {
+  private def updateLastSentHighWatermark(highWatermark: Long): Unit = {
     _lastSentHighWatermark = highWatermark
     trace(s"Updated HW of replica to $highWatermark")
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ff39339..a59c1b5 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -889,16 +889,6 @@ class ReplicaManager(val config: KafkaConfig,
       }
     }
 
-    // Wrap the given callback function with another function that will update the HW for the remote follower
-    def maybeUpdateHwAndSendResponse(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
-      if (isFromFollower) {
-        fetchPartitionData.foreach {
-          case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
-        }
-      }
-      responseCallback(fetchPartitionData)
-    }
-
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
@@ -909,7 +899,7 @@ class ReplicaManager(val config: KafkaConfig,
         tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
           result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
       }
-      maybeUpdateHwAndSendResponse(fetchPartitionData)
+      responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
       val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@@ -922,7 +912,7 @@ class ReplicaManager(val config: KafkaConfig,
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
         fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
-        maybeUpdateHwAndSendResponse)
+        responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
       val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
@@ -1559,12 +1549,13 @@ class ReplicaManager(val config: KafkaConfig,
               followerFetchOffsetMetadata = readResult.info.fetchOffsetMetadata,
               followerStartOffset = readResult.followerLogStartOffset,
               followerFetchTimeMs = readResult.fetchTimeMs,
-              leaderEndOffset = readResult.leaderLogEndOffset)) {
+              leaderEndOffset = readResult.leaderLogEndOffset,
+              lastSentHighwatermark = readResult.highWatermark)) {
               readResult
             } else {
               warn(s"Leader $localBrokerId failed to record follower $followerId's position " +
-                s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +
-                s"one of the assigned replicas ${partition.allReplicaIds.mkString(",")} " +
+                s"${readResult.info.fetchOffsetMetadata.messageOffset}, and last sent HW since the replica " +
+                s"is not recognized to be one of the assigned replicas ${partition.allReplicaIds.mkString(",")} " +
                 s"for partition $topicPartition. Empty records will be returned for this partition.")
               readResult.withEmptyFetchInfo
             }
@@ -1577,15 +1568,6 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = {
-    nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match {
-      case Some(replica) => replica.updateLastSentHighWatermark(highWatermark)
-      case None =>
-        warn(s"While updating the HW for follower $followerId for partition $topicPartition, " +
-          s"the replica could not be found.")
-    }
-  }
-
   private def leaderPartitionsIterator: Iterator[Partition] =
     nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)
 
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 8eb23f3..317d8f7 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -161,8 +161,7 @@ class DelayedFetchTest extends EasyMockSupport {
 
     val follower = new Replica(replicaId, topicPartition)
     followerHW.foreach(hw => {
-      follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L)
-      follower.updateLastSentHighWatermark(hw)
+      follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L, hw)
     })
     EasyMock.expect(partition.getReplica(replicaId))
         .andReturn(Some(follower))
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 6204174..133ce1d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -511,7 +511,8 @@ class PartitionTest {
         followerFetchOffsetMetadata = fetchOffsetMetadata,
         followerStartOffset = 0L,
         followerFetchTimeMs = time.milliseconds(),
-        leaderEndOffset = partition.localLogOrException.logEndOffset)
+        leaderEndOffset = partition.localLogOrException.logEndOffset,
+        lastSentHighwatermark = partition.localLogOrException.highWatermark)
     }
 
     def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = {
@@ -886,7 +887,8 @@ class PartitionTest {
         followerFetchOffsetMetadata = fetchOffsetMetadata,
         followerStartOffset = 0L,
         followerFetchTimeMs = time.milliseconds(),
-        leaderEndOffset = partition.localLogOrException.logEndOffset)
+        leaderEndOffset = partition.localLogOrException.logEndOffset,
+        lastSentHighwatermark = partition.localLogOrException.highWatermark)
     }
 
     updateFollowerFetchState(follower2, LogOffsetMetadata(0))
@@ -1121,7 +1123,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(3),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 6L)
+      leaderEndOffset = 6L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
 
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(3L, remoteReplica.logEndOffset)
@@ -1133,7 +1136,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(6L),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 6L)
+      leaderEndOffset = 6L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
 
     assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs)
     assertEquals(6L, remoteReplica.logEndOffset)
@@ -1181,7 +1185,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(3),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 6L)
+      leaderEndOffset = 6L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
 
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
     assertEquals(3L, remoteReplica.logEndOffset)
@@ -1199,7 +1204,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(10),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 6L)
+      leaderEndOffset = 6L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
 
     assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(10L, remoteReplica.logEndOffset)
@@ -1252,7 +1258,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(10),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 10L)
+      leaderEndOffset = 10L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
 
     // Follower state is updated, but the ISR has not expanded
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@@ -1363,7 +1370,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(5),
       followerStartOffset = 0L,
       followerFetchTimeMs = firstFetchTimeMs,
-      leaderEndOffset = 10L)
+      leaderEndOffset = 10L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(5L, partition.localLogOrException.highWatermark)
     assertEquals(5L, remoteReplica.logEndOffset)
@@ -1377,7 +1385,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(10),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 15L)
+      leaderEndOffset = 15L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
     assertEquals(firstFetchTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(10L, partition.localLogOrException.highWatermark)
     assertEquals(10L, remoteReplica.logEndOffset)
@@ -1433,7 +1442,8 @@ class PartitionTest {
       followerFetchOffsetMetadata = LogOffsetMetadata(10),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
-      leaderEndOffset = 10L)
+      leaderEndOffset = 10L,
+      lastSentHighwatermark = partition.localLogOrException.highWatermark)
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(10L, partition.localLogOrException.highWatermark)
     assertEquals(10L, remoteReplica.logEndOffset)
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index e43283e..a71b24c 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -39,6 +39,7 @@ class IsrExpirationTest {
   val replicaLagTimeMaxMs = 100L
   val replicaFetchWaitMaxMs = 100
   val leaderLogEndOffset = 20
+  val leaderLogHighWatermark = 20L
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
@@ -85,7 +86,8 @@ class IsrExpirationTest {
         followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
-        leaderEndOffset = leaderLogEndOffset)
+        leaderEndOffset = leaderLogEndOffset,
+        lastSentHighwatermark = partition0.localLogOrException.highWatermark)
     var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
 
@@ -134,7 +136,8 @@ class IsrExpirationTest {
         followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 2),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
-        leaderEndOffset = leaderLogEndOffset)
+        leaderEndOffset = leaderLogEndOffset,
+        lastSentHighwatermark = partition0.localLogOrException.highWatermark)
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.
     // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck
@@ -148,7 +151,8 @@ class IsrExpirationTest {
         followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset - 1),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
-        leaderEndOffset = leaderLogEndOffset)
+        leaderEndOffset = leaderLogEndOffset,
+        lastSentHighwatermark = partition0.localLogOrException.highWatermark)
     }
     partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@@ -165,7 +169,8 @@ class IsrExpirationTest {
         followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
-        leaderEndOffset = leaderLogEndOffset)
+        leaderEndOffset = leaderLogEndOffset,
+        lastSentHighwatermark = partition0.localLogOrException.highWatermark)
     }
     partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@@ -190,7 +195,8 @@ class IsrExpirationTest {
         followerFetchOffsetMetadata = LogOffsetMetadata(leaderLogEndOffset),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
-        leaderEndOffset = leaderLogEndOffset)
+        leaderEndOffset = leaderLogEndOffset,
+        lastSentHighwatermark = partition0.localLogOrException.highWatermark)
 
     var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR)
@@ -222,7 +228,8 @@ class IsrExpirationTest {
         followerFetchOffsetMetadata = LogOffsetMetadata(0L),
         followerStartOffset = 0L,
         followerFetchTimeMs= time.milliseconds,
-        leaderEndOffset = 0L)
+        leaderEndOffset = 0L,
+        lastSentHighwatermark = partition.localLogOrException.highWatermark)
 
     // set the leader and its hw and the hw update time
     partition.leaderReplicaIdOpt = Some(leaderId)
@@ -234,6 +241,7 @@ class IsrExpirationTest {
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLogEndOffset).anyTimes()
+    EasyMock.expect(log.highWatermark).andReturn(leaderLogHighWatermark).anyTimes()
     EasyMock.replay(log)
     log
   }
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index dd15914..26cd941 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -139,7 +139,8 @@ class SimpleFetchTest {
       followerFetchOffsetMetadata = leo,
       followerStartOffset = 0L,
       followerFetchTimeMs= time.milliseconds,
-      leaderEndOffset = leo.messageOffset)
+      leaderEndOffset = leo.messageOffset,
+      partition.localLogOrException.highWatermark)
   }
 
   @After
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 35375a4..c3e0746 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -161,9 +161,9 @@ public class UpdateFollowerFetchStateBenchmark {
     public void updateFollowerFetchStateBench() {
         // measure the impact of two follower fetches on the leader
         partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0),
-                0, 1, nextOffset);
+                0, 1, nextOffset, nextOffset);
         partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0),
-                0, 1, nextOffset);
+                0, 1, nextOffset, nextOffset);
         nextOffset++;
     }
 
@@ -173,8 +173,8 @@ public class UpdateFollowerFetchStateBenchmark {
         // measure the impact of two follower fetches on the leader when the follower didn't
         // end up fetching anything
         partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0),
-                0, 1, 100);
+                0, 1, 100, nextOffset);
         partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0),
-                0, 1, 100);
+                0, 1, 100, nextOffset);
     }
 }