You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/27 18:59:12 UTC
git commit: kafka-1549;
dead brokers coming in the TopicMetadataResponse; patched by Nicu Marasoiu;
reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk d9e5080df -> 7f2278fb9
kafka-1549; dead brokers coming in the TopicMetadataResponse; patched by Nicu Marasoiu; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f2278fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f2278fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f2278fb
Branch: refs/heads/trunk
Commit: 7f2278fb9de08141f21b017ba66752857dcdeba4
Parents: d9e5080
Author: Nicu Marasoiu <nm...@adobe.com>
Authored: Sun Jul 27 09:59:08 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Jul 27 09:59:08 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 27 ++++++++++----------
.../main/scala/kafka/server/MetadataCache.scala | 17 ++++++------
core/src/main/scala/kafka/utils/Utils.scala | 11 +++++---
3 files changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7f2278fb/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f2ca856..134aef9 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,8 +18,7 @@ package kafka.cluster
import kafka.common._
import kafka.admin.AdminUtils
-import kafka.utils.{ReplicationUtils, Pool, Time, Logging}
-import kafka.utils.Utils.inLock
+import kafka.utils._
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.{OffsetManager, ReplicaManager}
@@ -29,7 +28,7 @@ import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.Some
+import kafka.utils.Utils.{inReadLock,inWriteLock}
import scala.collection._
import com.yammer.metrics.core.Gauge
@@ -73,7 +72,7 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(_) =>
inSyncReplicas.size < assignedReplicas.size
@@ -115,7 +114,7 @@ class Partition(val topic: String,
}
def leaderReplicaIfLocal(): Option[Replica] = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ inReadLock(leaderIsrUpdateLock) {
leaderReplicaIdOpt match {
case Some(leaderReplicaId) =>
if (leaderReplicaId == localBrokerId)
@@ -141,7 +140,7 @@ class Partition(val topic: String,
def delete() {
// need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
- inLock(leaderIsrUpdateLock.writeLock()) {
+ inWriteLock(leaderIsrUpdateLock) {
assignedReplicaMap.clear()
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
@@ -156,7 +155,7 @@ class Partition(val topic: String,
}
def getLeaderEpoch(): Int = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ inReadLock(leaderIsrUpdateLock) {
return this.leaderEpoch
}
}
@@ -168,7 +167,7 @@ class Partition(val topic: String,
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int,
offsetManager: OffsetManager): Boolean = {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -201,7 +200,7 @@ class Partition(val topic: String,
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
correlationId: Int, offsetManager: OffsetManager): Boolean = {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -235,7 +234,7 @@ class Partition(val topic: String,
}
def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ inWriteLock(leaderIsrUpdateLock) {
debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
val replicaOpt = getReplica(replicaId)
if(!replicaOpt.isDefined) {
@@ -271,7 +270,7 @@ class Partition(val topic: String,
}
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(_) =>
val numAcks = inSyncReplicas.count(r => {
@@ -315,7 +314,7 @@ class Partition(val topic: String,
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
- inLock(leaderIsrUpdateLock.writeLock()) {
+ inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
@@ -357,7 +356,7 @@ class Partition(val topic: String,
}
def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
@@ -400,7 +399,7 @@ class Partition(val topic: String,
}
override def toString(): String = {
- inLock(leaderIsrUpdateLock.readLock()) {
+ inReadLock(leaderIsrUpdateLock) {
val partitionString = new StringBuilder
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7f2278fb/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 7cd40e1..bf81a1a 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -25,7 +25,6 @@ import kafka.utils.Utils._
import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
import kafka.common.TopicAndPartition
import kafka.controller.KafkaController.StateChangeLogger
-import scala.Some
/**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
@@ -34,14 +33,14 @@ import scala.Some
private[server] class MetadataCache {
private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
- private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+ private var aliveBrokers: Map[Int, Broker] = Map()
private val partitionMetadataLock = new ReentrantReadWriteLock()
def getTopicMetadata(topics: Set[String]) = {
val isAllTopics = topics.isEmpty
val topicsRequested = if(isAllTopics) cache.keySet else topics
val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
- inLock(partitionMetadataLock.readLock()) {
+ inReadLock(partitionMetadataLock) {
for (topic <- topicsRequested) {
if (isAllTopics || cache.contains(topic)) {
val partitionStateInfos = cache(topic)
@@ -82,15 +81,15 @@ private[server] class MetadataCache {
}
def getAliveBrokers = {
- inLock(partitionMetadataLock.readLock()) {
- aliveBrokers.values.toList
+ inReadLock(partitionMetadataLock) {
+ aliveBrokers.values.toSeq
}
}
def addOrUpdatePartitionInfo(topic: String,
partitionId: Int,
stateInfo: PartitionStateInfo) {
- inLock(partitionMetadataLock.writeLock()) {
+ inWriteLock(partitionMetadataLock) {
cache.get(topic) match {
case Some(infos) => infos.put(partitionId, stateInfo)
case None => {
@@ -103,7 +102,7 @@ private[server] class MetadataCache {
}
def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
- inLock(partitionMetadataLock.readLock()) {
+ inReadLock(partitionMetadataLock) {
cache.get(topic) match {
case Some(partitionInfos) => partitionInfos.get(partitionId)
case None => None
@@ -114,8 +113,8 @@ private[server] class MetadataCache {
def updateCache(updateMetadataRequest: UpdateMetadataRequest,
brokerId: Int,
stateChangeLogger: StateChangeLogger) {
- inLock(partitionMetadataLock.writeLock()) {
- updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
+ inWriteLock(partitionMetadataLock) {
+ aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7f2278fb/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 6576adf..09bfbce 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
import java.nio._
import charset.Charset
import java.nio.channels._
-import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.{ReadWriteLock, Lock}
import java.lang.management._
import javax.management._
import scala.collection._
@@ -540,13 +540,18 @@ object Utils extends Logging {
def inLock[T](lock: Lock)(fun: => T): T = {
lock.lock()
try {
- return fun
+ fun
} finally {
lock.unlock()
}
}
- //JSON strings need to be escaped based on ECMA-404 standard http://json.org
+ def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.readLock)(fun)
+
+ def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)
+
+
+ //JSON strings need to be escaped based on ECMA-404 standard http://json.org
def JSONEscapeString (s : String) : String = {
s.map {
case '"' => "\\\""