You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/11/11 20:05:00 UTC
[kafka] branch 2.4 updated: KAFKA-9171: Handle
ReplicaNotAvailableException during DelayedFetch (#7678)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 2f42737 KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#7678)
2f42737 is described below
commit 2f4273705d3915362336e36b9b6887b1a8aa9304
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Nov 11 20:03:06 2019 +0000
KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#7678)
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../src/main/scala/kafka/server/DelayedFetch.scala | 42 +++++++++++----------
.../kafka/server/DelayedFetchTest.scala | 43 +++++++++++++++++++++-
2 files changed, 64 insertions(+), 21 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 849fb1d..7e0da30 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -68,12 +68,13 @@ class DelayedFetch(delayMs: Long,
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
- * Case B: This broker does not know of some partitions it tries to fetch
- * Case C: The fetch offset locates not on the last segment of the log
- * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
- * Case E: The partition is in an offline log directory on this broker
- * Case F: This broker is the leader, but the requested epoch is now fenced
- * Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
+ * Case B: The replica is no longer available on this broker
+ * Case C: This broker does not know of some partitions it tries to fetch
+ * Case D: The partition is in an offline log directory on this broker
+ * Case E: This broker is the leader, but the requested epoch is now fenced
+ * Case F: The fetch offset locates not on the last segment of the log
+ * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+ * Case H: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
@@ -94,16 +95,16 @@ class DelayedFetch(delayMs: Long,
case FetchTxnCommitted => offsetSnapshot.lastStableOffset
}
- // Go directly to the check for Case D if the message offsets are the same. If the log segment
+ // Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
- // which would incorrectly be seen as an instance of Case C.
+ // which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
- // Case C, this can happen when the new fetch operation is on a truncated leader
+ // Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
- // Case C, this can happen when the fetch operation is falling behind the current segment
+ // Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
@@ -118,7 +119,7 @@ class DelayedFetch(delayMs: Long,
}
if (fetchMetadata.isFromFollower) {
- // Case G check if the follower has the latest HW from the leader
+ // Case H check if the follower has the latest HW from the leader
if (partition.getReplica(fetchMetadata.replicaId)
.exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
return forceComplete()
@@ -126,23 +127,26 @@ class DelayedFetch(delayMs: Long,
}
}
} catch {
- case _: KafkaStorageException => // Case E
- debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
+ case _: NotLeaderForPartitionException => // Case A
+ debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
- case _: UnknownTopicOrPartitionException => // Case B
+ case _: ReplicaNotAvailableException => // Case B
+ debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
+ return forceComplete()
+ case _: UnknownTopicOrPartitionException => // Case C
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
- case _: FencedLeaderEpochException => // Case F
+ case _: KafkaStorageException => // Case D
+ debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
+ return forceComplete()
+ case _: FencedLeaderEpochException => // Case E
debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
return forceComplete()
- case _: NotLeaderForPartitionException => // Case A
- debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
- return forceComplete()
}
}
- // Case D
+ // Case G
if (accumulatedSize >= fetchMetadata.fetchMinBytes)
forceComplete()
else
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index d41bc5b..806916f 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -19,11 +19,10 @@ package kafka.server
import java.util.Optional
import scala.collection.Seq
-
import kafka.cluster.{Partition, Replica}
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.FencedLeaderEpochException
+import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
@@ -81,6 +80,46 @@ class DelayedFetchTest extends EasyMockSupport {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
}
+ @Test
+ def testReplicaNotAvailable(): Unit = {
+ val topicPartition = new TopicPartition("topic", 0)
+ val fetchOffset = 500L
+ val logStartOffset = 0L
+ val currentLeaderEpoch = Optional.of[Integer](10)
+ val replicaId = 1
+
+ val fetchStatus = FetchPartitionStatus(
+ startOffsetMetadata = LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
+
+ var fetchResultOpt: Option[FetchPartitionData] = None
+ def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+ val delayedFetch = new DelayedFetch(
+ delayMs = 500,
+ fetchMetadata = fetchMetadata,
+ replicaManager = replicaManager,
+ quota = replicaQuota,
+ clientMetadata = None,
+ responseCallback = callback)
+
+ val partition: Partition = mock(classOf[Partition])
+
+ EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
+ .andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
+ expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
+ EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
+
+ replayAll()
+
+ assertTrue(delayedFetch.tryComplete())
+ assertTrue(delayedFetch.isCompleted)
+ assertTrue(fetchResultOpt.isDefined)
+ }
+
def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L