You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/25 23:45:10 UTC
kafka git commit: KAFKA-3076;
BrokerChangeListener should log the brokers in order
Repository: kafka
Updated Branches:
refs/heads/trunk fa6b90f97 -> 9f21837e9
KAFKA-3076; BrokerChangeListener should log the brokers in order
Author: Konrad <ko...@gmail.com>
Author: konradkalita <ko...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #749 from konradkalita/kafka-3076
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f21837e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f21837e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f21837e
Branch: refs/heads/trunk
Commit: 9f21837e9925cf768c95a29423f9481b50dbe21d
Parents: fa6b90f
Author: Konrad <ko...@gmail.com>
Authored: Mon Jan 25 14:45:09 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 25 14:45:09 2016 -0800
----------------------------------------------------------------------
.../scala/kafka/controller/ReplicaStateMachine.scala | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9f21837e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 7ebece7..2fd8b95 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -352,7 +352,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
- info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
+ info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
@@ -364,14 +364,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
controllerContext.liveBrokers = curBrokers
+ val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+ val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+ val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
- .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
+ .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
if(newBrokerIds.size > 0)
- controller.onBrokerStartup(newBrokerIds.toSeq)
+ controller.onBrokerStartup(newBrokerIdsSorted)
if(deadBrokerIds.size > 0)
- controller.onBrokerFailure(deadBrokerIds.toSeq)
+ controller.onBrokerFailure(deadBrokerIdsSorted)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}