You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/06/20 22:23:12 UTC

git commit: kafka-947; isr-expiration-thread may block LeaderAndIsr request for a relatively long period; patched by Jun Rao; reviewed by Joel Koshy

Updated Branches:
  refs/heads/0.8 7b43f0132 -> cc6027b32


kafka-947; isr-expiration-thread may block LeaderAndIsr request for a relatively long period; patched by Jun Rao; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc6027b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc6027b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc6027b3

Branch: refs/heads/0.8
Commit: cc6027b3279ac5cd8908d01d35511c93716fbe60
Parents: 7b43f01
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jun 20 13:22:56 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jun 20 13:22:56 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cc6027b3/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9d41e82..d885ba1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -282,9 +282,11 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
+    var curLeaderPartitions: List[Partition] = null
     leaderPartitionsLock synchronized {
-      leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
+      curLeaderPartitions = leaderPartitions.toList
     }
+    curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
   }
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {