You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/04 14:52:19 UTC
kafka git commit: KAFKA-2722; Improve ISR change propagation.
Repository: kafka
Updated Branches:
refs/heads/trunk 98db5ea94 -> 70a7d5786
KAFKA-2722; Improve ISR change propagation.
The patch has two changes:
1. fixed a bug in controller that it sends UpdateMetadataRequest of all the partitions in the cluster.
2. Uses the following rules to propagate ISR change: 1) if there are ISR changes pending propagation and the last ISR change is more than five seconds ago, propagate the changes. 2) if there is ISR change at T in the recent five seconds, delay the propagation until T + 5s. 3) if the last propagation is more than 1 min ago, ignore rule No.2 and propagate ISR change if there are changes pending propagation.
This algorithm avoids a fixed configuration of ISR propagation interval as we discussed about in KIP-29.
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #402 from becketqin/KAFKA-2722
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70a7d578
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70a7d578
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70a7d578
Branch: refs/heads/trunk
Commit: 70a7d5786cc3f04b5f3d964eb1fd1d826e9b9e0f
Parents: 98db5ea
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Nov 4 05:52:16 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 4 05:52:16 2015 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 6 +++--
.../scala/kafka/server/ReplicaManager.scala | 24 ++++++++++++++++----
.../integration/BaseTopicMetadataTest.scala | 2 +-
3 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/70a7d578/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0a1a684..7c03a24 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1354,8 +1354,10 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
try {
val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
- controller.updateLeaderAndIsrCache(topicAndPartitions)
- processUpdateNotifications(topicAndPartitions)
+ if (topicAndPartitions.nonEmpty) {
+ controller.updateLeaderAndIsrCache(topicAndPartitions)
+ processUpdateNotifications(topicAndPartitions)
+ }
} finally {
// delete processed children
childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
http://git-wip-us.apache.org/repos/asf/kafka/blob/70a7d578/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0413b1a..89f2462 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -18,7 +18,7 @@ package kafka.server
import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import com.yammer.metrics.core.Gauge
import kafka.api._
@@ -29,7 +29,6 @@ import kafka.log.{LogAppendInfo, LogManager}
import kafka.message.{ByteBufferMessageSet, MessageSet}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.{Time => JTime}
@@ -93,6 +92,8 @@ case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
+ val IsrChangePropagationBlackOut = 5000L
+ val IsrChangePropagationInterval = 60000L
}
class ReplicaManager(val config: KafkaConfig,
@@ -116,6 +117,8 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
+ private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
+ private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
@@ -157,14 +160,25 @@ class ReplicaManager(val config: KafkaConfig,
def recordIsrChange(topicAndPartition: TopicAndPartition) {
isrChangeSet synchronized {
isrChangeSet += topicAndPartition
+ lastIsrChangeMs.set(System.currentTimeMillis())
}
}
-
+ /**
+ * This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
+ * 1. There is ISR change not propagated yet.
+ * 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
+ * This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
+ * other brokers when large amount of ISR change occurs.
+ */
def maybePropagateIsrChanges() {
+ val now = System.currentTimeMillis()
isrChangeSet synchronized {
- if (isrChangeSet.nonEmpty) {
+ if (isrChangeSet.nonEmpty &&
+ (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
+ lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
isrChangeSet.clear()
+ lastIsrPropagationMs.set(now)
}
}
}
@@ -196,7 +210,7 @@ class ReplicaManager(val config: KafkaConfig,
def startup() {
// start ISR expiration thread
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
- scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 5000, unit = TimeUnit.MILLISECONDS)
+ scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/70a7d578/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 05dc0bc..a621efc 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -210,7 +210,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
metadata.topicsMetadata.head.partitionsMetadata.head.isr
else
- ""), 6000L)
+ ""), 8000L)
})
}