You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 10:37:35 UTC

kafka git commit: KAFKA-5258; Move all partition and replica state transition rules into their states

Repository: kafka
Updated Branches:
  refs/heads/trunk 816578b5c -> 5aaaba7ff


KAFKA-5258; Move all partition and replica state transition rules into their states

Today PartitionStateMachine and ReplicaStateMachine define and assert the
valid state transitions inline for each state. It's cleaner to move the
transition rules into ReplicaState/PartitionState and do the assertion at
the top of the handleStateChange.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3071 from onurkaraman/KAFKA-5258


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5aaaba7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5aaaba7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5aaaba7f

Branch: refs/heads/trunk
Commit: 5aaaba7ffe51a3216262811f24e6ccf10488b5e1
Parents: 816578b
Author: Onur Karaman <ok...@linkedin.com>
Authored: Thu May 18 11:31:47 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 11:35:51 2017 +0100

----------------------------------------------------------------------
 .../controller/PartitionStateMachine.scala      | 44 ++++++++-----
 .../kafka/controller/ReplicaStateMachine.scala  | 65 +++++++++++++-------
 2 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5aaaba7f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5bed329..4cffc13 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -146,10 +146,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     val topicAndPartition = TopicAndPartition(topic, partition)
     val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
     try {
+      assertValidTransition(topicAndPartition, targetState)
       targetState match {
         case NewPartition =>
-          // pre: partition did not exist before this
-          assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           partitionState.put(topicAndPartition, NewPartition)
           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
@@ -157,7 +156,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
                                             assignedReplicas))
           // post: partition has been assigned replicas
         case OnlinePartition =>
-          assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
           partitionState(topicAndPartition) match {
             case NewPartition =>
               // initialize leader and isr path for new partition
@@ -174,16 +172,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
            // post: partition has a leader
         case OfflinePartition =>
-          // pre: partition should be in New or Online state
-          assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
-          // pre: partition should be in Offline state
-          assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
           partitionState.put(topicAndPartition, NonExistentPartition)
@@ -217,11 +211,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
-  private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState],
-                                        targetState: PartitionState) {
-    if(!fromStates.contains(partitionState(topicAndPartition)))
+  private def assertValidTransition(topicAndPartition: TopicAndPartition, targetState: PartitionState): Unit = {
+    if (!targetState.validPreviousStates.contains(partitionState(topicAndPartition)))
       throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
-        .format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
+        .format(topicAndPartition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
         .format(partitionState(topicAndPartition)))
   }
 
@@ -351,8 +344,27 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 }
 
-sealed trait PartitionState { def state: Byte }
-case object NewPartition extends PartitionState { val state: Byte = 0 }
-case object OnlinePartition extends PartitionState { val state: Byte = 1 }
-case object OfflinePartition extends PartitionState { val state: Byte = 2 }
-case object NonExistentPartition extends PartitionState { val state: Byte = 3 }
+sealed trait PartitionState {
+  def state: Byte
+  def validPreviousStates: Set[PartitionState]
+}
+
+case object NewPartition extends PartitionState {
+  val state: Byte = 0
+  val validPreviousStates: Set[PartitionState] = Set(NonExistentPartition)
+}
+
+case object OnlinePartition extends PartitionState {
+  val state: Byte = 1
+  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
+}
+
+case object OfflinePartition extends PartitionState {
+  val state: Byte = 2
+  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
+}
+
+case object NonExistentPartition extends PartitionState {
+  val state: Byte = 3
+  val validPreviousStates: Set[PartitionState] = Set(OfflinePartition)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aaaba7f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c812f4a..0759ed4 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -46,6 +46,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val zkUtils = controllerContext.zkUtils
   private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
@@ -136,9 +137,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
     try {
       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
+      assertValidTransition(partitionAndReplica, targetState)
       targetState match {
         case NewReplica =>
-          assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
           // start replica as a follower to the current leader for its partition
           val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
           leaderIsrAndControllerEpochOpt match {
@@ -156,7 +157,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                             targetState))
         case ReplicaDeletionStarted =>
-          assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
           replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
@@ -164,17 +164,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case ReplicaDeletionIneligible =>
-          assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
           replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case ReplicaDeletionSuccessful =>
-          assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
           replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case NonExistentReplica =>
-          assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
@@ -182,8 +179,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case OnlineReplica =>
-          assertValidPreviousStates(partitionAndReplica,
-            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
           replicaState(partitionAndReplica) match {
             case NewReplica =>
               // add this replica to the assigned replicas list for its partition
@@ -208,8 +203,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           }
           replicaState.put(partitionAndReplica, OnlineReplica)
         case OfflineReplica =>
-          assertValidPreviousStates(partitionAndReplica,
-            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
           // send stop replica command to the replica so that it stops fetching from the leader
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
           // As an optimization, the controller removes dead replicas from the ISR
@@ -273,11 +266,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
   }
 
-  private def assertValidPreviousStates(partitionAndReplica: PartitionAndReplica, fromStates: Seq[ReplicaState],
-                                        targetState: ReplicaState) {
-    assert(fromStates.contains(replicaState(partitionAndReplica)),
+  private def assertValidTransition(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState): Unit = {
+    assert(targetState.validPreviousStates.contains(replicaState(partitionAndReplica)),
       "Replica %s should be in the %s states before moving to %s state"
-        .format(partitionAndReplica, fromStates.mkString(","), targetState) +
+        .format(partitionAndReplica, targetState.validPreviousStates.mkString(","), targetState) +
         ". Instead it is in %s state".format(replicaState(partitionAndReplica)))
   }
 
@@ -307,11 +299,42 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   }
 }
 
-sealed trait ReplicaState { def state: Byte }
-case object NewReplica extends ReplicaState { val state: Byte = 1 }
-case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
-case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
-case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4}
-case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
-case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6}
-case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }
+sealed trait ReplicaState {
+  def state: Byte
+  def validPreviousStates: Set[ReplicaState]
+}
+
+case object NewReplica extends ReplicaState {
+  val state: Byte = 1
+  val validPreviousStates: Set[ReplicaState] = Set(NonExistentReplica)
+}
+
+case object OnlineReplica extends ReplicaState {
+  val state: Byte = 2
+  val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible)
+}
+
+case object OfflineReplica extends ReplicaState {
+  val state: Byte = 3
+  val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible)
+}
+
+case object ReplicaDeletionStarted extends ReplicaState {
+  val state: Byte = 4
+  val validPreviousStates: Set[ReplicaState] = Set(OfflineReplica)
+}
+
+case object ReplicaDeletionSuccessful extends ReplicaState {
+  val state: Byte = 5
+  val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted)
+}
+
+case object ReplicaDeletionIneligible extends ReplicaState {
+  val state: Byte = 6
+  val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted)
+}
+
+case object NonExistentReplica extends ReplicaState {
+  val state: Byte = 7
+  val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionSuccessful)
+}