You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/11 19:09:38 UTC

git commit: KAFKA-860 Replica fetcher thread errors out and dies during rolling bounce of cluster; reviewed by Jun Rao, Jay Kreps

Updated Branches:
  refs/heads/0.8 6dbf9212a -> f9e36ca79


KAFKA-860 Replica fetcher thread errors out and dies during rolling bounce of cluster; reviewed by Jun Rao, Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9e36ca7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9e36ca7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9e36ca7

Branch: refs/heads/0.8
Commit: f9e36ca790060a12a51569169cc4a53c4ebc5d74
Parents: 6dbf921
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Apr 11 10:09:24 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Apr 11 10:09:24 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala  |   26 +++++++++++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   13 ++++---
 .../main/scala/kafka/server/ReplicaManager.scala   |    4 +-
 3 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9e36ca7/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index cbac5d0..9a29fb2 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,9 +23,10 @@ import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ErrorMapping
+import kafka.common.{NotLeaderForPartitionException, ErrorMapping}
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 import org.apache.log4j.Logger
+import kafka.message.ByteBufferMessageSet
 
 
 /**
@@ -259,7 +260,11 @@ class Partition(val topic: String,
     }
   }
 
-  def maybeIncrementLeaderHW(leaderReplica: Replica) {
+  /**
+   * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock
+   * @param leaderReplica
+   */
+  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min
     val oldHighWatermark = leaderReplica.highWatermark
@@ -315,6 +320,23 @@ class Partition(val topic: String,
     stuckReplicas ++ slowReplicas
   }
 
+  def appendMessagesToLeader(messages: ByteBufferMessageSet): (Long, Long) = {
+    leaderIsrUpdateLock synchronized {
+      val leaderReplicaOpt = leaderReplicaIfLocal()
+      leaderReplicaOpt match {
+        case Some(leaderReplica) =>
+          val log = leaderReplica.log.get
+          val (start, end) = log.append(messages, assignOffsets = true)
+          // we may need to increment high watermark since ISR could be down to 1
+          maybeIncrementLeaderHW(leaderReplica)
+          (start, end)
+        case None =>
+          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
+            .format(topic, partitionId, localBrokerId))
+      }
+    }
+  }
+
   private def updateIsr(newIsr: Set[Replica]) {
     debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(",")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9e36ca7/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7ee81a1..6b6f8f2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -188,11 +188,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
 
       try {
-        val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
-        val log = localReplica.log.get
-        val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
-        // we may need to increment high watermark since ISR could be down to 1
-        localReplica.partition.maybeIncrementLeaderHW(localReplica)
+        val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
+        val (start, end) =
+          partitionOpt match {
+            case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+            case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+              .format(topicAndPartition, brokerId))
+
+          }
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
               .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
         ProduceResult(topicAndPartition, start, end)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9e36ca7/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0d39a57..89ad4d7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -175,12 +175,12 @@ class ReplicaManager(val config: KafkaConfig,
     val partitionOpt = getPartition(topic, partitionId)
     partitionOpt match {
       case None =>
-        throw new UnknownTopicOrPartitionException("Topic %s partition %d doesn't exist on %d".format(topic, partitionId, config.brokerId))
+        throw new UnknownTopicOrPartitionException("Partition [%s,%d] doesn't exist on %d".format(topic, partitionId, config.brokerId))
       case Some(partition) =>
         partition.leaderReplicaIfLocal match {
           case Some(leaderReplica) => leaderReplica
           case None =>
-            throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d"
+            throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
                     .format(topic, partitionId, config.brokerId))
         }
     }