You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/11 19:09:38 UTC
git commit: KAFKA-860 Replica fetcher thread errors out and dies
during rolling bounce of cluster; reviewed by Jun Rao, Jay Kreps
Updated Branches:
refs/heads/0.8 6dbf9212a -> f9e36ca79
KAFKA-860 Replica fetcher thread errors out and dies during rolling bounce of cluster; reviewed by Jun Rao, Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9e36ca7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9e36ca7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9e36ca7
Branch: refs/heads/0.8
Commit: f9e36ca790060a12a51569169cc4a53c4ebc5d74
Parents: 6dbf921
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Apr 11 10:09:24 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Apr 11 10:09:24 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Partition.scala | 26 +++++++++++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 13 ++++---
.../main/scala/kafka/server/ReplicaManager.scala | 4 +-
3 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f9e36ca7/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index cbac5d0..9a29fb2 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,9 +23,10 @@ import kafka.api.LeaderAndIsr
import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ErrorMapping
+import kafka.common.{NotLeaderForPartitionException, ErrorMapping}
import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
import org.apache.log4j.Logger
+import kafka.message.ByteBufferMessageSet
/**
@@ -259,7 +260,11 @@ class Partition(val topic: String,
}
}
- def maybeIncrementLeaderHW(leaderReplica: Replica) {
+ /**
+ * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock
+ * @param leaderReplica
+ */
+ private def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min
val oldHighWatermark = leaderReplica.highWatermark
@@ -315,6 +320,23 @@ class Partition(val topic: String,
stuckReplicas ++ slowReplicas
}
+ def appendMessagesToLeader(messages: ByteBufferMessageSet): (Long, Long) = {
+ leaderIsrUpdateLock synchronized {
+ val leaderReplicaOpt = leaderReplicaIfLocal()
+ leaderReplicaOpt match {
+ case Some(leaderReplica) =>
+ val log = leaderReplica.log.get
+ val (start, end) = log.append(messages, assignOffsets = true)
+ // we may need to increment high watermark since ISR could be down to 1
+ maybeIncrementLeaderHW(leaderReplica)
+ (start, end)
+ case None =>
+ throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
+ .format(topic, partitionId, localBrokerId))
+ }
+ }
+ }
+
private def updateIsr(newIsr: Set[Replica]) {
debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(",")))
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f9e36ca7/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7ee81a1..6b6f8f2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -188,11 +188,14 @@ class KafkaApis(val requestChannel: RequestChannel,
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
try {
- val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
- val log = localReplica.log.get
- val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
- // we may need to increment high watermark since ISR could be down to 1
- localReplica.partition.maybeIncrementLeaderHW(localReplica)
+ val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
+ val (start, end) =
+ partitionOpt match {
+ case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+ case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+ .format(topicAndPartition, brokerId))
+
+ }
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
.format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
ProduceResult(topicAndPartition, start, end)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f9e36ca7/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0d39a57..89ad4d7 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -175,12 +175,12 @@ class ReplicaManager(val config: KafkaConfig,
val partitionOpt = getPartition(topic, partitionId)
partitionOpt match {
case None =>
- throw new UnknownTopicOrPartitionException("Topic %s partition %d doesn't exist on %d".format(topic, partitionId, config.brokerId))
+ throw new UnknownTopicOrPartitionException("Partition [%s,%d] doesn't exist on %d".format(topic, partitionId, config.brokerId))
case Some(partition) =>
partition.leaderReplicaIfLocal match {
case Some(leaderReplica) => leaderReplica
case None =>
- throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d"
+ throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, config.brokerId))
}
}