You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/27 18:59:12 UTC

git commit: kafka-1549; dead brokers coming in the TopicMetadataResponse; patched by Nicu Marasoiu; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk d9e5080df -> 7f2278fb9


kafka-1549; dead brokers coming in the TopicMetadataResponse; patched by Nicu Marasoiu; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f2278fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f2278fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f2278fb

Branch: refs/heads/trunk
Commit: 7f2278fb9de08141f21b017ba66752857dcdeba4
Parents: d9e5080
Author: Nicu Marasoiu <nm...@adobe.com>
Authored: Sun Jul 27 09:59:08 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Jul 27 09:59:08 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 27 ++++++++++----------
 .../main/scala/kafka/server/MetadataCache.scala | 17 ++++++------
 core/src/main/scala/kafka/utils/Utils.scala     | 11 +++++---
 3 files changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7f2278fb/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f2ca856..134aef9 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,8 +18,7 @@ package kafka.cluster
 
 import kafka.common._
 import kafka.admin.AdminUtils
-import kafka.utils.{ReplicationUtils, Pool, Time, Logging}
-import kafka.utils.Utils.inLock
+import kafka.utils._
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
 import kafka.server.{OffsetManager, ReplicaManager}
@@ -29,7 +28,7 @@ import kafka.message.ByteBufferMessageSet
 
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.Some
+import kafka.utils.Utils.{inReadLock,inWriteLock}
 import scala.collection._
 
 import com.yammer.metrics.core.Gauge
@@ -73,7 +72,7 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    inLock(leaderIsrUpdateLock.readLock()) {
+    inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal() match {
         case Some(_) =>
           inSyncReplicas.size < assignedReplicas.size
@@ -115,7 +114,7 @@ class Partition(val topic: String,
   }
 
   def leaderReplicaIfLocal(): Option[Replica] = {
-    inLock(leaderIsrUpdateLock.readLock()) {
+    inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIdOpt match {
         case Some(leaderReplicaId) =>
           if (leaderReplicaId == localBrokerId)
@@ -141,7 +140,7 @@ class Partition(val topic: String,
 
   def delete() {
     // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
-    inLock(leaderIsrUpdateLock.writeLock()) {
+    inWriteLock(leaderIsrUpdateLock) {
       assignedReplicaMap.clear()
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
@@ -156,7 +155,7 @@ class Partition(val topic: String,
   }
 
   def getLeaderEpoch(): Int = {
-    inLock(leaderIsrUpdateLock.readLock()) {
+    inReadLock(leaderIsrUpdateLock) {
       return this.leaderEpoch
     }
   }
@@ -168,7 +167,7 @@ class Partition(val topic: String,
   def makeLeader(controllerId: Int,
                  partitionStateInfo: PartitionStateInfo, correlationId: Int,
                  offsetManager: OffsetManager): Boolean = {
-    inLock(leaderIsrUpdateLock.writeLock()) {
+    inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -201,7 +200,7 @@ class Partition(val topic: String,
   def makeFollower(controllerId: Int,
                    partitionStateInfo: PartitionStateInfo,
                    correlationId: Int, offsetManager: OffsetManager): Boolean = {
-    inLock(leaderIsrUpdateLock.writeLock()) {
+    inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -235,7 +234,7 @@ class Partition(val topic: String,
   }
 
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
-    inLock(leaderIsrUpdateLock.writeLock()) {
+    inWriteLock(leaderIsrUpdateLock) {
       debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
       val replicaOpt = getReplica(replicaId)
       if(!replicaOpt.isDefined) {
@@ -271,7 +270,7 @@ class Partition(val topic: String,
   }
 
   def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
-    inLock(leaderIsrUpdateLock.readLock()) {
+    inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal() match {
         case Some(_) =>
           val numAcks = inSyncReplicas.count(r => {
@@ -315,7 +314,7 @@ class Partition(val topic: String,
   }
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
-    inLock(leaderIsrUpdateLock.writeLock()) {
+    inWriteLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
@@ -357,7 +356,7 @@ class Partition(val topic: String,
   }
 
   def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
-    inLock(leaderIsrUpdateLock.readLock()) {
+    inReadLock(leaderIsrUpdateLock) {
       val leaderReplicaOpt = leaderReplicaIfLocal()
       leaderReplicaOpt match {
         case Some(leaderReplica) =>
@@ -400,7 +399,7 @@ class Partition(val topic: String,
   }
 
   override def toString(): String = {
-    inLock(leaderIsrUpdateLock.readLock()) {
+    inReadLock(leaderIsrUpdateLock) {
       val partitionString = new StringBuilder
       partitionString.append("Topic: " + topic)
       partitionString.append("; Partition: " + partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7f2278fb/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 7cd40e1..bf81a1a 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -25,7 +25,6 @@ import kafka.utils.Utils._
 import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
 import kafka.common.TopicAndPartition
 import kafka.controller.KafkaController.StateChangeLogger
-import scala.Some
 
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
@@ -34,14 +33,14 @@ import scala.Some
 private[server] class MetadataCache {
   private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
     new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
-  private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+  private var aliveBrokers: Map[Int, Broker] = Map()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
   def getTopicMetadata(topics: Set[String]) = {
     val isAllTopics = topics.isEmpty
     val topicsRequested = if(isAllTopics) cache.keySet else topics
     val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
-    inLock(partitionMetadataLock.readLock()) {
+    inReadLock(partitionMetadataLock) {
       for (topic <- topicsRequested) {
         if (isAllTopics || cache.contains(topic)) {
           val partitionStateInfos = cache(topic)
@@ -82,15 +81,15 @@ private[server] class MetadataCache {
   }
 
   def getAliveBrokers = {
-    inLock(partitionMetadataLock.readLock()) {
-      aliveBrokers.values.toList
+    inReadLock(partitionMetadataLock) {
+      aliveBrokers.values.toSeq
     }
   }
 
   def addOrUpdatePartitionInfo(topic: String,
                                partitionId: Int,
                                stateInfo: PartitionStateInfo) {
-    inLock(partitionMetadataLock.writeLock()) {
+    inWriteLock(partitionMetadataLock) {
       cache.get(topic) match {
         case Some(infos) => infos.put(partitionId, stateInfo)
         case None => {
@@ -103,7 +102,7 @@ private[server] class MetadataCache {
   }
 
   def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
-    inLock(partitionMetadataLock.readLock()) {
+    inReadLock(partitionMetadataLock) {
       cache.get(topic) match {
         case Some(partitionInfos) => partitionInfos.get(partitionId)
         case None => None
@@ -114,8 +113,8 @@ private[server] class MetadataCache {
   def updateCache(updateMetadataRequest: UpdateMetadataRequest,
                   brokerId: Int,
                   stateChangeLogger: StateChangeLogger) {
-    inLock(partitionMetadataLock.writeLock()) {
-      updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
+    inWriteLock(partitionMetadataLock) {
+      aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
       updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
         if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
           removePartitionInfo(tp.topic, tp.partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7f2278fb/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 6576adf..09bfbce 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.nio._
 import charset.Charset
 import java.nio.channels._
-import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.{ReadWriteLock, Lock}
 import java.lang.management._
 import javax.management._
 import scala.collection._
@@ -540,13 +540,18 @@ object Utils extends Logging {
   def inLock[T](lock: Lock)(fun: => T): T = {
     lock.lock()
     try {
-       return fun
+      fun
     } finally {
       lock.unlock()
     }
   }
 
-    //JSON strings need to be escaped based on ECMA-404 standard http://json.org
+  def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.readLock)(fun)
+
+  def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)
+
+
+  //JSON strings need to be escaped based on ECMA-404 standard http://json.org
   def JSONEscapeString (s : String) : String = {
     s.map {
       case '"'  => "\\\""