You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/17 15:53:24 UTC

kafka git commit: KAFKA-6046; DeleteRecordsRequest to a follower should return NOT_LEADER

Repository: kafka
Updated Branches:
  refs/heads/trunk ad71b9d06 -> 406071c2b


KAFKA-6046; DeleteRecordsRequest to a follower should return NOT_LEADER

Tested with DeleteRecordsRequestTest by Tom Bentley, which is part of
a separate pull request.

Author: tedyu <yu...@gmail.com>

Reviewers: Tom Bentley <tb...@redhat.com>, Ismael Juma <is...@juma.me.uk>

Closes #4052 from tedyu/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/406071c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/406071c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/406071c2

Branch: refs/heads/trunk
Commit: 406071c2b5c06b25f9167ffd566325fdb65b268d
Parents: ad71b9d
Author: tedyu <yu...@gmail.com>
Authored: Fri Nov 17 15:16:25 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 17 15:18:48 2017 +0000

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     | 26 ++++++--------------
 1 file changed, 8 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/406071c2/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2de50d8..886e80e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -418,6 +418,11 @@ class ReplicaManager(val config: KafkaConfig,
   def getReplicaOrException(topicPartition: TopicPartition): Replica = getReplicaOrException(topicPartition, localBrokerId)
 
   def getLeaderReplicaIfLocal(topicPartition: TopicPartition): Replica =  {
+    val (_, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
+    replica
+  }
+
+  def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): (Partition, Replica) =  {
     val partitionOpt = getPartition(topicPartition)
     partitionOpt match {
       case None =>
@@ -426,7 +431,7 @@ class ReplicaManager(val config: KafkaConfig,
         if (partition eq ReplicaManager.OfflinePartition)
           throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
         else partition.leaderReplicaIfLocal match {
-          case Some(leaderReplica) => leaderReplica
+          case Some(leaderReplica) => (partition, leaderReplica)
           case None =>
             throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
         }
@@ -514,23 +519,10 @@ class ReplicaManager(val config: KafkaConfig,
         (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
       } else {
         try {
-          val partition = getPartition(topicPartition) match {
-            case Some(p) =>
-              if (p eq ReplicaManager.OfflinePartition)
-                throw new KafkaStorageException("Partition %s is in an offline log directory on broker %d".format(topicPartition, localBrokerId))
-              p
-            case None =>
-              throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))
-          }
+          val (partition, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
           val convertedOffset =
             if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
-              partition.leaderReplicaIfLocal match {
-                case Some(leaderReplica) =>
-                  leaderReplica.highWatermark.messageOffset
-                case None =>
-                  throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
-                    .format(topicPartition, localBrokerId))
-              }
+              replica.highWatermark.messageOffset
             } else
               requestedOffset
           if (convertedOffset < 0)
@@ -539,8 +531,6 @@ class ReplicaManager(val config: KafkaConfig,
           val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset)
           (topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark))
         } catch {
-          // NOTE: Failed produce requests metric is not incremented for known exceptions
-          // it is supposed to indicate un-expected failures of a broker in handling a produce request
           case e@ (_: UnknownTopicOrPartitionException |
                    _: NotLeaderForPartitionException |
                    _: OffsetOutOfRangeException |