You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/23 21:52:10 UTC
kafka git commit: KAFKA-3242: minor rename / logging change to
Controller
Repository: kafka
Updated Branches:
refs/heads/trunk 3358e1682 -> 525f4c81d
KAFKA-3242: minor rename / logging change to Controller
KAFKA-3242: minor rename / logging change to references to 'adding partitions' to indicate 'modifying partitions'
Author: Ben Stopford <be...@gmail.com>
Reviewers: Grant Henke
Closes #924 from benstopford/small_changes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/525f4c81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/525f4c81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/525f4c81
Branch: refs/heads/trunk
Commit: 525f4c81db8f0dfd600b98f68cbb46e78bd76349
Parents: 3358e16
Author: Ben Stopford <be...@gmail.com>
Authored: Tue Feb 23 12:52:04 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Feb 23 12:52:04 2016 -0800
----------------------------------------------------------------------
.../kafka/controller/PartitionStateMachine.scala | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/525f4c81/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 73b173e..ec03b84 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -51,7 +51,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
private val topicChangeListener = new TopicChangeListener()
private val deleteTopicsListener = new DeleteTopicsListener()
- private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
+ private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
private val stateChangeLogger = KafkaController.stateChangeLogger
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
@@ -82,11 +82,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// de-register topic and partition change listeners
def deregisterListeners() {
deregisterTopicChangeListener()
- addPartitionsListener.foreach {
+ partitionModificationsListeners.foreach {
case (topic, listener) =>
zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
}
- addPartitionsListener.clear()
+ partitionModificationsListeners.clear()
if(controller.config.deleteTopicEnable)
deregisterDeleteTopicListener()
}
@@ -379,13 +379,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
def registerPartitionChangeListener(topic: String) = {
- addPartitionsListener.put(topic, new AddPartitionsListener(topic))
- zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
+ partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic))
+ zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
}
def deregisterPartitionChangeListener(topic: String) = {
- zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
- addPartitionsListener.remove(topic)
+ zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
+ partitionModificationsListeners.remove(topic)
}
private def registerDeleteTopicListener() = {
@@ -497,7 +497,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
}
- class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
+ class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging {
this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
@@ -505,7 +505,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
def handleDataChange(dataPath : String, data: Object) {
inLock(controllerContext.controllerLock) {
try {
- info("Add Partition triggered " + data.toString + " for path " + dataPath)
+ info(s"Partition modification triggered $data for path $dataPath")
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))