You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/06/03 23:06:46 UTC
git commit: KAFKA-928 new topics may not be processed after ZK
session expiration in controller; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 4850519a2 -> 658427638
KAFKA-928 new topics may not be processed after ZK session expiration in controller; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65842763
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65842763
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65842763
Branch: refs/heads/0.8
Commit: 658427638f1b00ae1d284e579b4631f7119e54cc
Parents: 4850519
Author: Neha Narkhede <ne...@apache.org>
Authored: Mon Jun 3 14:06:08 2013 -0700
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Jun 3 14:06:08 2013 -0700
----------------------------------------------------------------------
.../kafka/controller/PartitionStateMachine.scala | 5 ++---
.../kafka/controller/ReplicaStateMachine.scala | 5 ++---
2 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/65842763/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0f5ebde..deebed0 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -46,7 +46,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
controllerId, controller.clientId)
private val hasStarted = new AtomicBoolean(false)
- private val hasShutdown = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -74,7 +73,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* Invoked on controller shutdown.
*/
def shutdown() {
- hasShutdown.compareAndSet(false, true)
+ hasStarted.set(false)
partitionState.clear()
}
@@ -358,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
controllerContext.controllerLock synchronized {
- if (!hasShutdown.get) {
+ if (hasStarted.get) {
try {
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
val currentChildren = JavaConversions.asBuffer(children).toSet
http://git-wip-us.apache.org/repos/asf/kafka/blob/65842763/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5283fcd..4fbb28e 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -45,7 +45,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
controllerId, controller.clientId)
private val hasStarted = new AtomicBoolean(false)
- private val hasShutdown = new AtomicBoolean(false)
this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -73,7 +72,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
* Invoked on controller shutdown.
*/
def shutdown() {
- hasShutdown.compareAndSet(false, true)
+ hasStarted.set(false)
replicaState.clear()
}
@@ -252,7 +251,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
ControllerStats.leaderElectionTimer.time {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
controllerContext.controllerLock synchronized {
- if (!hasShutdown.get) {
+ if (hasStarted.get) {
try {
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds