You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/06/20 22:23:12 UTC
git commit: kafka-947;
isr-expiration-thread may block LeaderAndIsr request for a relatively
long period; patched by Jun Rao; reviewed by Joel Koshy
Updated Branches:
refs/heads/0.8 7b43f0132 -> cc6027b32
kafka-947; isr-expiration-thread may block LeaderAndIsr request for a relatively long period; patched by Jun Rao; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc6027b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc6027b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc6027b3
Branch: refs/heads/0.8
Commit: cc6027b3279ac5cd8908d01d35511c93716fbe60
Parents: 7b43f01
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jun 20 13:22:56 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jun 20 13:22:56 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cc6027b3/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9d41e82..d885ba1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -282,9 +282,11 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
+ var curLeaderPartitions: List[Partition] = null
leaderPartitionsLock synchronized {
- leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
+ curLeaderPartitions = leaderPartitions.toList
}
+ curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {