You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/04 14:52:19 UTC

kafka git commit: KAFKA-2722; Improve ISR change propagation.

Repository: kafka
Updated Branches:
  refs/heads/trunk 98db5ea94 -> 70a7d5786


KAFKA-2722; Improve ISR change propagation.

The patch has two changes:
1. fixed a bug in controller that it sends UpdateMetadataRequest of all the partitions in the cluster.
2. Uses the following rules to propagate ISR change: 1) if there are ISR changes pending propagation and the last ISR change is more than five seconds ago, propagate the changes. 2) if there is ISR change at T in the recent five seconds, delay the propagation until T + 5s. 3) if the last propagation is more than 1 min ago, ignore rule No.2 and propagate ISR change if there are changes pending propagation.

This algorithm avoids a fixed configuration of ISR propagation interval as we discussed about in KIP-29.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #402 from becketqin/KAFKA-2722


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70a7d578
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70a7d578
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70a7d578

Branch: refs/heads/trunk
Commit: 70a7d5786cc3f04b5f3d964eb1fd1d826e9b9e0f
Parents: 98db5ea
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Nov 4 05:52:16 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 4 05:52:16 2015 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      |  6 +++--
 .../scala/kafka/server/ReplicaManager.scala     | 24 ++++++++++++++++----
 .../integration/BaseTopicMetadataTest.scala     |  2 +-
 3 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/70a7d578/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0a1a684..7c03a24 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1354,8 +1354,10 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
       val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
       try {
         val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
-        controller.updateLeaderAndIsrCache(topicAndPartitions)
-        processUpdateNotifications(topicAndPartitions)
+        if (topicAndPartitions.nonEmpty) {
+          controller.updateLeaderAndIsrCache(topicAndPartitions)
+          processUpdateNotifications(topicAndPartitions)
+        }
       } finally {
         // delete processed children
         childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(

http://git-wip-us.apache.org/repos/asf/kafka/blob/70a7d578/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0413b1a..89f2462 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -29,7 +29,6 @@ import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.{Time => JTime}
@@ -93,6 +92,8 @@ case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int
 
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -116,6 +117,8 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
   private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
+  private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
+  private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
 
   val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
     purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
@@ -157,14 +160,25 @@ class ReplicaManager(val config: KafkaConfig,
   def recordIsrChange(topicAndPartition: TopicAndPartition) {
     isrChangeSet synchronized {
       isrChangeSet += topicAndPartition
+      lastIsrChangeMs.set(System.currentTimeMillis())
     }
   }
-
+  /**
+   * This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
+   * 1. There is ISR change not propagated yet.
+   * 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
+   * This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
+   * other brokers when large amount of ISR change occurs.
+   */
   def maybePropagateIsrChanges() {
+    val now = System.currentTimeMillis()
     isrChangeSet synchronized {
-      if (isrChangeSet.nonEmpty) {
+      if (isrChangeSet.nonEmpty &&
+        (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
+          lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
         ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
         isrChangeSet.clear()
+        lastIsrPropagationMs.set(now)
       }
     }
   }
@@ -196,7 +210,7 @@ class ReplicaManager(val config: KafkaConfig,
   def startup() {
     // start ISR expiration thread
     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
-    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 5000, unit = TimeUnit.MILLISECONDS)
+    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/70a7d578/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 05dc0bc..a621efc 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -210,7 +210,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
                                 metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
                               metadata.topicsMetadata.head.partitionsMetadata.head.isr
                             else
-                              ""), 6000L)
+                              ""), 8000L)
     })
   }