You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/23 21:52:10 UTC

kafka git commit: KAFKA-3242: minor rename / logging change to Controller

Repository: kafka
Updated Branches:
  refs/heads/trunk 3358e1682 -> 525f4c81d


KAFKA-3242: minor rename / logging change to Controller

KAFKA-3242: minor rename / logging change to references to 'adding partitions' to indicate 'modifying partitions'

Author: Ben Stopford <be...@gmail.com>

Reviewers: Grant Henke

Closes #924 from benstopford/small_changes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/525f4c81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/525f4c81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/525f4c81

Branch: refs/heads/trunk
Commit: 525f4c81db8f0dfd600b98f68cbb46e78bd76349
Parents: 3358e16
Author: Ben Stopford <be...@gmail.com>
Authored: Tue Feb 23 12:52:04 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Feb 23 12:52:04 2016 -0800

----------------------------------------------------------------------
 .../kafka/controller/PartitionStateMachine.scala  | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/525f4c81/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 73b173e..ec03b84 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -51,7 +51,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   private val topicChangeListener = new TopicChangeListener()
   private val deleteTopicsListener = new DeleteTopicsListener()
-  private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
+  private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
@@ -82,11 +82,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   // de-register topic and partition change listeners
   def deregisterListeners() {
     deregisterTopicChangeListener()
-    addPartitionsListener.foreach {
+    partitionModificationsListeners.foreach {
       case (topic, listener) =>
         zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
     }
-    addPartitionsListener.clear()
+    partitionModificationsListeners.clear()
     if(controller.config.deleteTopicEnable)
       deregisterDeleteTopicListener()
   }
@@ -379,13 +379,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   def registerPartitionChangeListener(topic: String) = {
-    addPartitionsListener.put(topic, new AddPartitionsListener(topic))
-    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
+    partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic))
+    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
   }
 
   def deregisterPartitionChangeListener(topic: String) = {
-    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
-    addPartitionsListener.remove(topic)
+    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
+    partitionModificationsListeners.remove(topic)
   }
 
   private def registerDeleteTopicListener() = {
@@ -497,7 +497,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
-  class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
+  class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging {
 
     this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
 
@@ -505,7 +505,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     def handleDataChange(dataPath : String, data: Object) {
       inLock(controllerContext.controllerLock) {
         try {
-          info("Add Partition triggered " + data.toString + " for path " + dataPath)
+          info(s"Partition modification triggered $data for path $dataPath")
           val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
           val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
             !controllerContext.partitionReplicaAssignment.contains(p._1))