You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/11/11 20:05:00 UTC

[kafka] branch 2.4 updated: KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#7678)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 2f42737  KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#7678)
2f42737 is described below

commit 2f4273705d3915362336e36b9b6887b1a8aa9304
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Nov 11 20:03:06 2019 +0000

    KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#7678)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../src/main/scala/kafka/server/DelayedFetch.scala | 42 +++++++++++----------
 .../kafka/server/DelayedFetchTest.scala            | 43 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 849fb1d..7e0da30 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -68,12 +68,13 @@ class DelayedFetch(delayMs: Long,
    * The operation can be completed if:
    *
    * Case A: This broker is no longer the leader for some partitions it tries to fetch
-   * Case B: This broker does not know of some partitions it tries to fetch
-   * Case C: The fetch offset locates not on the last segment of the log
-   * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
-   * Case E: The partition is in an offline log directory on this broker
-   * Case F: This broker is the leader, but the requested epoch is now fenced
-   * Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
+   * Case B: The replica is no longer available on this broker
+   * Case C: This broker does not know of some partitions it tries to fetch
+   * Case D: The partition is in an offline log directory on this broker
+   * Case E: This broker is the leader, but the requested epoch is now fenced
+   * Case F: The fetch offset locates not on the last segment of the log
+   * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+   * Case H: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
    * Upon completion, should return whatever data is available for each valid partition
    */
   override def tryComplete(): Boolean = {
@@ -94,16 +95,16 @@ class DelayedFetch(delayMs: Long,
               case FetchTxnCommitted => offsetSnapshot.lastStableOffset
             }
 
-            // Go directly to the check for Case D if the message offsets are the same. If the log segment
+            // Go directly to the check for Case G if the message offsets are the same. If the log segment
             // has just rolled, then the high watermark offset will remain the same but be on the old segment,
-            // which would incorrectly be seen as an instance of Case C.
+            // which would incorrectly be seen as an instance of Case F.
             if (endOffset.messageOffset != fetchOffset.messageOffset) {
               if (endOffset.onOlderSegment(fetchOffset)) {
-                // Case C, this can happen when the new fetch operation is on a truncated leader
+                // Case F, this can happen when the new fetch operation is on a truncated leader
                 debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
                 return forceComplete()
               } else if (fetchOffset.onOlderSegment(endOffset)) {
-                // Case C, this can happen when the fetch operation is falling behind the current segment
+                // Case F, this can happen when the fetch operation is falling behind the current segment
                 // or the partition has just rolled a new segment
                 debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
                 // We will not force complete the fetch request if a replica should be throttled.
@@ -118,7 +119,7 @@ class DelayedFetch(delayMs: Long,
             }
 
             if (fetchMetadata.isFromFollower) {
-              // Case G check if the follower has the latest HW from the leader
+              // Case H check if the follower has the latest HW from the leader
               if (partition.getReplica(fetchMetadata.replicaId)
                 .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
                 return forceComplete()
@@ -126,23 +127,26 @@ class DelayedFetch(delayMs: Long,
             }
           }
         } catch {
-          case _: KafkaStorageException => // Case E
-            debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
+          case _: NotLeaderForPartitionException =>  // Case A
+            debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
             return forceComplete()
-          case _: UnknownTopicOrPartitionException => // Case B
+          case _: ReplicaNotAvailableException =>  // Case B
+            debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
+            return forceComplete()
+          case _: UnknownTopicOrPartitionException => // Case C
             debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
             return forceComplete()
-          case _: FencedLeaderEpochException => // Case F
+          case _: KafkaStorageException => // Case D
+            debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
+            return forceComplete()
+          case _: FencedLeaderEpochException => // Case E
             debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
               s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
             return forceComplete()
-          case _: NotLeaderForPartitionException =>  // Case A
-            debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
-            return forceComplete()
         }
     }
 
-    // Case D
+    // Case G
     if (accumulatedSize >= fetchMetadata.fetchMinBytes)
        forceComplete()
     else
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index d41bc5b..806916f 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -19,11 +19,10 @@ package kafka.server
 import java.util.Optional
 
 import scala.collection.Seq
-
 import kafka.cluster.{Partition, Replica}
 import kafka.log.LogOffsetSnapshot
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.FencedLeaderEpochException
+import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
@@ -81,6 +80,46 @@ class DelayedFetchTest extends EasyMockSupport {
     assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
   }
 
+  @Test
+  def testReplicaNotAvailable(): Unit = {
+    val topicPartition = new TopicPartition("topic", 0)
+    val fetchOffset = 500L
+    val logStartOffset = 0L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      delayMs = 500,
+      fetchMetadata = fetchMetadata,
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      clientMetadata = None,
+      responseCallback = callback)
+
+    val partition: Partition = mock(classOf[Partition])
+
+    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
+      .andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
+    expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
+    EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
+
+    replayAll()
+
+    assertTrue(delayedFetch.tryComplete())
+    assertTrue(delayedFetch.isCompleted)
+    assertTrue(fetchResultOpt.isDefined)
+  }
+
   def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
     val topicPartition = new TopicPartition("topic", 0)
     val fetchOffset = 500L