You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/17 15:53:24 UTC
kafka git commit: KAFKA-6046;
DeleteRecordsRequest to a follower should return NOT_LEADER
Repository: kafka
Updated Branches:
refs/heads/trunk ad71b9d06 -> 406071c2b
KAFKA-6046; DeleteRecordsRequest to a follower should return NOT_LEADER
Tested with DeleteRecordsRequestTest by Tom Bentley, which is part of
a separate pull request.
Author: tedyu <yu...@gmail.com>
Reviewers: Tom Bentley <tb...@redhat.com>, Ismael Juma <is...@juma.me.uk>
Closes #4052 from tedyu/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/406071c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/406071c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/406071c2
Branch: refs/heads/trunk
Commit: 406071c2b5c06b25f9167ffd566325fdb65b268d
Parents: ad71b9d
Author: tedyu <yu...@gmail.com>
Authored: Fri Nov 17 15:16:25 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 17 15:18:48 2017 +0000
----------------------------------------------------------------------
.../scala/kafka/server/ReplicaManager.scala | 26 ++++++--------------
1 file changed, 8 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/406071c2/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2de50d8..886e80e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -418,6 +418,11 @@ class ReplicaManager(val config: KafkaConfig,
def getReplicaOrException(topicPartition: TopicPartition): Replica = getReplicaOrException(topicPartition, localBrokerId)
def getLeaderReplicaIfLocal(topicPartition: TopicPartition): Replica = {
+ val (_, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
+ replica
+ }
+
+ def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): (Partition, Replica) = {
val partitionOpt = getPartition(topicPartition)
partitionOpt match {
case None =>
@@ -426,7 +431,7 @@ class ReplicaManager(val config: KafkaConfig,
if (partition eq ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
else partition.leaderReplicaIfLocal match {
- case Some(leaderReplica) => leaderReplica
+ case Some(leaderReplica) => (partition, leaderReplica)
case None =>
throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
}
@@ -514,23 +519,10 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
- val partition = getPartition(topicPartition) match {
- case Some(p) =>
- if (p eq ReplicaManager.OfflinePartition)
- throw new KafkaStorageException("Partition %s is in an offline log directory on broker %d".format(topicPartition, localBrokerId))
- p
- case None =>
- throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))
- }
+ val (partition, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
val convertedOffset =
if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
- partition.leaderReplicaIfLocal match {
- case Some(leaderReplica) =>
- leaderReplica.highWatermark.messageOffset
- case None =>
- throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
- .format(topicPartition, localBrokerId))
- }
+ replica.highWatermark.messageOffset
} else
requestedOffset
if (convertedOffset < 0)
@@ -539,8 +531,6 @@ class ReplicaManager(val config: KafkaConfig,
val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset)
(topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark))
} catch {
- // NOTE: Failed produce requests metric is not incremented for known exceptions
- // it is supposed to indicate un-expected failures of a broker in handling a produce request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: OffsetOutOfRangeException |