You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/09/07 21:18:20 UTC

[kafka] branch trunk updated: KAFKA-6082; Fence zookeeper updates with controller epoch zkVersion

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 297fb39  KAFKA-6082; Fence zookeeper updates with controller epoch zkVersion
297fb39 is described below

commit 297fb396a0038addea06a76fd4ab9a451eb7562e
Author: Zhanxiang (Patrick) Huang <hz...@hotmail.com>
AuthorDate: Fri Sep 7 14:17:49 2018 -0700

    KAFKA-6082; Fence zookeeper updates with controller epoch zkVersion
    
    This PR aims to enforce that the controller can only update zookeeper states after checking the controller epoch zkVersion. The check and zookeeper state updates are wrapped in the zookeeper multi() operations to ensure that they are done atomically. This PR is necessary to resolve issues related to multiple controllers (i.e. old controller updates zookeeper states before resignation, which is possible during controller failover based on the single threaded event queue model we have)
    
    This PR includes the following changes:
    - Add MultiOp request and response in ZookeeperClient
    - Ensure all zookeeper updates done by controller are protected by checking the current controller epoch zkVersion
    - Modify test cases in KafkaZkClientTest to test mismatch controller epoch zkVersion
    
    Tests Done:
    - Unit tests (with updated tests to test mismatch controller epoch zkVersion)
    - Existing integration tests
    
    Author: Zhanxiang (Patrick) Huang <hz...@hotmail.com>
    
    Reviewers: Jun Rao <ju...@gmail.com>, Dong Lin <li...@gmail.com>, Manikumar Reddy O <ma...@gmail.com>
    
    Closes #5101 from hzxa21/KAFKA-6082
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../scala/kafka/controller/ControllerContext.scala |   4 +-
 .../kafka/controller/ControllerEventManager.scala  |  10 +-
 .../scala/kafka/controller/KafkaController.scala   | 161 +++++++-------
 .../kafka/controller/PartitionStateMachine.scala   |  11 +-
 .../kafka/controller/ReplicaStateMachine.scala     |   6 +-
 .../kafka/controller/TopicDeletionManager.scala    |   8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 232 ++++++++++++++++-----
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    | 134 ++++++++----
 .../admin/ReassignPartitionsClusterTest.scala      |   4 +-
 .../controller/ControllerEventManagerTest.scala    |   2 +-
 .../controller/ControllerIntegrationTest.scala     | 153 +++++++++++---
 .../controller/PartitionStateMachineTest.scala     |  16 +-
 .../kafka/controller/ReplicaStateMachineTest.scala |   2 +-
 .../unit/kafka/utils/LogCaptureAppender.scala      |  66 ++++++
 .../unit/kafka/utils/ReplicationUtilsTest.scala    |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   4 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 157 +++++++++-----
 19 files changed, 680 insertions(+), 298 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 22c1508..d76d6d0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -73,7 +73,7 @@ class Partition(val topic: String,
    * the controller sends it a start replica command containing the leader for each partition that the broker hosts.
    * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
    * each partition. */
-  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
   private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index f4671cf..20c3de0 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -28,8 +28,8 @@ class ControllerContext {
   var controllerChannelManager: ControllerChannelManager = null
 
   var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
-  var epoch: Int = KafkaController.InitialControllerEpoch - 1
-  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
+  var epoch: Int = KafkaController.InitialControllerEpoch
+  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
   var allTopics: Set[String] = Set.empty
   private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
   val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 13967e0..c93e9e7 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -24,6 +24,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
+import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.utils.Time
 
 import scala.collection._
@@ -32,12 +33,14 @@ object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
 class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
-                             eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup {
+                             eventProcessedListener: ControllerEvent => Unit,
+                             controllerMovedListener: () => Unit) extends KafkaMetricsGroup {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[ControllerEvent]
-  private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+  // Visible for test
+  private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
   private val time = Time.SYSTEM
 
   private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
@@ -86,6 +89,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
               controllerEvent.process()
             }
           } catch {
+            case e: ControllerMovedException =>
+              info(s"Controller moved to another broker when processing $controllerEvent.", e)
+              controllerMovedListener()
             case e: Throwable => error(s"Error processing event $controllerEvent", e)
           }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 286768f..379e66d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -35,14 +35,14 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException
-import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
+import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection._
 import scala.util.Try
 
 object KafkaController extends Logging {
-  val InitialControllerEpoch = 1
-  val InitialControllerEpochZkVersion = 1
+  val InitialControllerEpoch = 0
+  val InitialControllerEpochZkVersion = 0
 
   /**
    * ControllerEventThread will shutdown once it sees this event
@@ -52,6 +52,12 @@ object KafkaController extends Logging {
     override def process(): Unit = ()
   }
 
+  // Used only by test
+  private[controller] case class AwaitOnLatch(latch: CountDownLatch) extends ControllerEvent {
+    override def state: ControllerState = ControllerState.ControllerChange
+    override def process(): Unit = latch.await()
+  }
+
 }
 
 class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
@@ -70,7 +76,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   // visible for testing
   private[controller] val eventManager = new ControllerEventManager(config.brokerId,
-    controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
+    controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())
 
   val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
@@ -214,21 +220,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
-   * 1. Registers controller epoch changed listener
-   * 2. Increments the controller epoch
-   * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
+   * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
    *    leaders for all existing partitions.
-   * 4. Starts the controller's channel manager
-   * 5. Starts the replica state machine
-   * 6. Starts the partition state machine
+   * 2. Starts the controller's channel manager
+   * 3. Starts the replica state machine
+   * 4. Starts the partition state machine
    * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
    * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
   private def onControllerFailover() {
-    info("Reading controller epoch from ZooKeeper")
-    readControllerEpochFromZooKeeper()
-    info("Incrementing controller epoch in ZooKeeper")
-    incrementControllerEpoch()
     info("Registering handlers")
 
     // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
@@ -239,9 +239,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
 
     info("Deleting log dir event notifications")
-    zkClient.deleteLogDirEventNotifications()
+    zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
     info("Deleting isr change notifications")
-    zkClient.deleteIsrChangeNotifications()
+    zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
     info("Initializing controller context")
     initializeControllerContext()
     info("Fetching topic deletions in progress")
@@ -599,6 +599,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
               topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
               onPartitionReassignment(tp, reassignedPartitionContext)
             } catch {
+              case e: ControllerMovedException =>
+                error(s"Error completing reassignment of partition $tp because controller has moved to another broker", e)
+                throw e
               case e: Throwable =>
                 error(s"Error completing reassignment of partition $tp", e)
                 // remove the partition from the admin path to unblock the admin client
@@ -619,41 +622,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     try {
       partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
     } catch {
+      case e: ControllerMovedException =>
+        error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")} because controller has moved to another broker.", e)
+        throw e
       case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e)
     } finally {
       removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
     }
   }
 
-  private def incrementControllerEpoch(): Unit = {
-    val newControllerEpoch = controllerContext.epoch + 1
-    val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
-    setDataResponse.resultCode match {
-      case Code.OK =>
-        controllerContext.epochZkVersion = setDataResponse.stat.getVersion
-        controllerContext.epoch = newControllerEpoch
-      case Code.NONODE =>
-        // if path doesn't exist, this is the first controller whose epoch should be 1
-        // the following call can still fail if another controller gets elected between checking if the path exists and
-        // trying to create the controller epoch path
-        val createResponse = zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
-        createResponse.resultCode match {
-          case Code.OK =>
-            controllerContext.epoch = KafkaController.InitialControllerEpoch
-            controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
-          case Code.NODEEXISTS =>
-            throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
-          case _ =>
-            val exception = createResponse.resultException.get
-            error("Error while incrementing controller epoch", exception)
-            throw exception
-        }
-      case _ =>
-        throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
-    }
-    info(s"Epoch incremented to ${controllerContext.epoch}")
-  }
-
   private def initializeControllerContext() {
     // update controller cache with delete topic information
     controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
@@ -783,7 +760,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   private def updateAssignedReplicasForPartition(partition: TopicPartition,
                                                  replicas: Seq[Int]) {
     controllerContext.updatePartitionReplicaAssignment(partition, replicas)
-    val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic))
+    val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic), controllerContext.epochZkVersion)
     setDataResponse.resultCode match {
       case Code.OK =>
         info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}")
@@ -844,16 +821,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient))
   }
 
-  private def readControllerEpochFromZooKeeper() {
-    // initialize the controller epoch and zk version by reading from zookeeper
-    val epochAndStatOpt = zkClient.getControllerEpoch
-    epochAndStatOpt.foreach { case (epoch, stat) =>
-      controllerContext.epoch = epoch
-      controllerContext.epochZkVersion = stat.getVersion
-      info(s"Initialized controller epoch to ${controllerContext.epoch} and zk version ${controllerContext.epochZkVersion}")
-    }
-  }
-
   /**
    * Remove partition from partitions being reassigned in ZooKeeper and ControllerContext. If the partition reassignment
    * is complete (i.e. there is no other partition with a reassignment in progress), the reassign_partitions znode
@@ -874,12 +841,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // write the new list to zookeeper
     if (updatedPartitionsBeingReassigned.isEmpty) {
       info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
-      zkClient.deletePartitionReassignment()
+      zkClient.deletePartitionReassignment(controllerContext.epochZkVersion)
       // Ensure we detect future reassignments
       eventManager.put(PartitionReassignment)
     } else {
       val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
-      try zkClient.setOrCreatePartitionReassignment(reassignment)
+      try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion)
       catch {
         case e: KeeperException => throw new AdminOperationException(e)
       }
@@ -902,7 +869,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       }
     }
     if (!isTriggeredByAutoRebalance) {
-      zkClient.deletePreferredReplicaElection()
+      zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion)
       // Ensure we detect future preferred replica leader elections
       eventManager.put(PreferredReplicaLeaderElection)
     }
@@ -955,7 +922,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
           // update the new leadership decision in zookeeper or retry
           val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
-            zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
+            zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
           if (successfulUpdates.contains(partition)) {
             val finalLeaderAndIsr = successfulUpdates(partition)
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
@@ -1204,13 +1171,32 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   }
 
   private def triggerControllerMove(): Unit = {
-    onControllerResignation()
-    activeControllerId = -1
-    zkClient.deleteController()
+    activeControllerId = zkClient.getControllerId.getOrElse(-1)
+    if (!isActive) {
+      warn("Controller has already moved when trying to trigger controller movement")
+      return
+    }
+    try {
+      val expectedControllerEpochZkVersion = controllerContext.epochZkVersion
+      activeControllerId = -1
+      onControllerResignation()
+      zkClient.deleteController(expectedControllerEpochZkVersion)
+    } catch {
+      case _: ControllerMovedException =>
+        warn("Controller has already moved when trying to trigger controller movement")
+    }
+  }
+
+  private def maybeResign(): Unit = {
+    val wasActiveBeforeChange = isActive
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+    activeControllerId = zkClient.getControllerId.getOrElse(-1)
+    if (wasActiveBeforeChange && !isActive) {
+      onControllerResignation()
+    }
   }
 
   private def elect(): Unit = {
-    val timestamp = time.milliseconds
     activeControllerId = zkClient.getControllerId.getOrElse(-1)
     /*
      * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
@@ -1223,22 +1209,27 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
 
     try {
-      zkClient.registerController(config.brokerId, timestamp)
-      info(s"${config.brokerId} successfully elected as the controller")
+      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
+      controllerContext.epoch = epoch
+      controllerContext.epochZkVersion = epochZkVersion
       activeControllerId = config.brokerId
+
+      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
+        s"and epoch zk version is now ${controllerContext.epochZkVersion}")
+
       onControllerFailover()
     } catch {
-      case _: NodeExistsException =>
-        // If someone else has written the path, then
-        activeControllerId = zkClient.getControllerId.getOrElse(-1)
+      case e: ControllerMovedException =>
+        maybeResign()
 
         if (activeControllerId != -1)
-          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}")
+          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
         else
-          warn("A controller has been elected but just resigned, this will result in another round of election")
+          warn("A controller has been elected but just resigned, this will result in another round of election", e)
 
-      case e2: Throwable =>
-        error(s"Error while electing or becoming controller on broker ${config.brokerId}", e2)
+      case t: Throwable =>
+        error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
+          s"Trigger controller movement immediately", t)
         triggerControllerMove()
     }
   }
@@ -1321,7 +1312,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         onBrokerLogDirFailure(brokerIds)
       } finally {
         // delete processed children
-        zkClient.deleteLogDirEventNotifications(sequenceNumbers)
+        zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
       }
     }
   }
@@ -1336,7 +1327,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val existingPartitionReplicaAssignment = newPartitionReplicaAssignment.filter(p =>
         existingPartitions.contains(p._1.partition.toString))
 
-      zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment)
+      zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment, controllerContext.epochZkVersion)
     }
 
     override def process(): Unit = {
@@ -1377,7 +1368,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
       if (nonExistentTopics.nonEmpty) {
         warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
-        zkClient.deleteTopicDeletions(nonExistentTopics.toSeq)
+        zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
       }
       topicsToBeDeleted --= nonExistentTopics
       if (config.deleteTopicEnable) {
@@ -1396,7 +1387,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       } else {
         // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
         info(s"Removing $topicsToBeDeleted since delete topic is disabled")
-        zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq)
+        zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
       }
     }
   }
@@ -1469,7 +1460,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         }
       } finally {
         // delete the notifications
-        zkClient.deleteIsrChangeNotifications(sequenceNumbers)
+        zkClient.deleteIsrChangeNotifications(sequenceNumbers, controllerContext.epochZkVersion)
       }
     }
 
@@ -1504,12 +1495,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     override def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      val wasActiveBeforeChange = isActive
-      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
-      activeControllerId = zkClient.getControllerId.getOrElse(-1)
-      if (wasActiveBeforeChange && !isActive) {
-        onControllerResignation()
-      }
+      maybeResign()
     }
   }
 
@@ -1517,12 +1503,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     override def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      val wasActiveBeforeChange = isActive
-      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
-      activeControllerId = zkClient.getControllerId.getOrElse(-1)
-      if (wasActiveBeforeChange && !isActive) {
-        onControllerResignation()
-      }
+      maybeResign()
       elect()
     }
   }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 3a0ac19..663ee8d 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -23,6 +23,7 @@ import kafka.utils.Logging
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
 
@@ -132,6 +133,9 @@ class PartitionStateMachine(config: KafkaConfig,
         doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
       } catch {
+        case e: ControllerMovedException =>
+          error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
+          throw e
         case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
       }
     }
@@ -250,8 +254,11 @@ class PartitionStateMachine(config: KafkaConfig,
       partition -> leaderIsrAndControllerEpoch
     }.toMap
     val createResponses = try {
-      zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+      zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion)
     } catch {
+      case e: ControllerMovedException =>
+        error("Controller moved to another broker when trying to create the topic partition state znode", e)
+        throw e
       case e: Exception =>
         partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
         Seq.empty
@@ -361,7 +368,7 @@ class PartitionStateMachine(config: KafkaConfig,
     val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
     val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
     val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
-      adjustedLeaderAndIsrs, controllerContext.epoch)
+      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
     successfulUpdates.foreach { case (partition, leaderAndIsr) =>
       val replicas = controllerContext.partitionReplicaAssignment(partition)
       val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 1ab8a43..433ab56 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -23,6 +23,7 @@ import kafka.utils.Logging
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.mutable
@@ -106,6 +107,9 @@ class ReplicaStateMachine(config: KafkaConfig,
         }
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
       } catch {
+        case e: ControllerMovedException =>
+          error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
+          throw e
         case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
       }
     }
@@ -299,7 +303,7 @@ class ReplicaStateMachine(config: KafkaConfig,
       leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
     }
     val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
-      adjustedLeaderAndIsrs, controllerContext.epoch)
+      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
     val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
       if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
         val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty")
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 8d93ef2..1ef79be 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -92,7 +92,7 @@ class TopicDeletionManager(controller: KafkaController,
     } else {
       // if delete topic is disabled clean the topic entries under /admin/delete_topics
       info(s"Removing $initialTopicsToBeDeleted since delete topic is disabled")
-      zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
+      zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
     }
   }
 
@@ -251,9 +251,9 @@ class TopicDeletionManager(controller: KafkaController,
     controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
     topicsToBeDeleted -= topic
     topicsWithDeletionStarted -= topic
-    zkClient.deleteTopicZNode(topic)
-    zkClient.deleteTopicConfigs(Seq(topic))
-    zkClient.deleteTopicDeletions(Seq(topic))
+    zkClient.deleteTopicZNode(topic, controllerContext.epochZkVersion)
+    zkClient.deleteTopicConfigs(Seq(topic), controllerContext.epochZkVersion)
+    zkClient.deleteTopicDeletions(Seq(topic), controllerContext.epochZkVersion)
     controllerContext.removeTopic(topic)
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 59581e7..2393daa 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -166,7 +166,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /* epoch of the controller that last changed the leader */
-  @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+  @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   private val localBrokerId = config.brokerId
   private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
     new Partition(tp.topic, tp.partition, time, this)))
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c807965..a12abb4 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,24 +21,27 @@ import java.util.Properties
 import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
-import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls}
+import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls}
 import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
+import org.apache.zookeeper.KeeperException.{BadVersionException, Code, ConnectionLossException, NodeExistsException}
+import org.apache.zookeeper.OpResult.{ErrorResult, SetDataResult}
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
+import scala.collection.JavaConverters._
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -86,14 +89,75 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   }
 
   /**
-   * Registers a given broker in zookeeper as the controller.
+   * Registers a given broker in zookeeper as the controller and increments controller epoch.
+   * @return the (updated controller epoch, epoch zkVersion) tuple
    * @param controllerId the id of the broker that is to be registered as the controller.
-   * @param timestamp the timestamp of the controller election.
-   * @throws KeeperException if an error is returned by ZooKeeper.
-   */
-  def registerController(controllerId: Int, timestamp: Long): Unit = {
-    val path = ControllerZNode.path
-    checkedEphemeralCreate(path, ControllerZNode.encode(controllerId, timestamp))
+   * @throws ControllerMovedException if fail to create /controller or fail to increment controller epoch.
+   */
+  def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = {
+    val timestamp = time.milliseconds()
+
+    // Read /controller_epoch to get the current controller epoch and zkVersion,
+    // create /controller_epoch with initial value if not exists
+    val (curEpoch, curEpochZkVersion) = getControllerEpoch
+      .map(e => (e._1, e._2.getVersion))
+      .getOrElse(maybeCreateControllerEpochZNode())
+
+    // Create /controller and update /controller_epoch atomically
+    val newControllerEpoch = curEpoch + 1
+    val expectedControllerEpochZkVersion = curEpochZkVersion
+
+    debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion")
+
+    def checkControllerAndEpoch(): (Int, Int) = {
+      val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException(
+        s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " +
+          s"Aborting controller startup procedure"))
+      if (controllerId == curControllerId) {
+        val (epoch, stat)  = getControllerEpoch.getOrElse(
+          throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it"))
+
+        // If the epoch is the same as newControllerEpoch, it is safe to infer that the returned epoch zkVersion
+        // is associated with the current broker during controller election because we already knew that the zk
+        // transaction succeeds based on the controller znode verification. Other rounds of controller
+        // election will result in larger epoch number written in zk.
+        if (epoch == newControllerEpoch)
+          return (newControllerEpoch, stat.getVersion)
+      }
+      throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
+    }
+
+    def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = {
+      try {
+        val transaction = zooKeeperClient.createTransaction()
+        transaction.create(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp),
+          acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
+        transaction.setData(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)
+        val results = transaction.commit()
+        val setDataResult = results.get(1).asInstanceOf[SetDataResult]
+        (newControllerEpoch, setDataResult.getStat.getVersion)
+      } catch {
+        case _: NodeExistsException | _: BadVersionException => checkControllerAndEpoch()
+        case _: ConnectionLossException =>
+          zooKeeperClient.waitUntilConnected()
+          tryCreateControllerZNodeAndIncrementEpoch()
+      }
+    }
+
+    tryCreateControllerZNodeAndIncrementEpoch()
+  }
+
+  private def maybeCreateControllerEpochZNode(): (Int, Int) = {
+    createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
+      case Code.OK =>
+        info(s"Successfully created ${ControllerEpochZNode.path} with initial epoch ${KafkaController.InitialControllerEpoch}")
+        (KafkaController.InitialControllerEpoch, KafkaController.InitialControllerEpochZkVersion)
+      case Code.NODEEXISTS =>
+        val (epoch, stat) = getControllerEpoch.getOrElse(throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it"))
+        (epoch, stat.getVersion)
+      case code =>
+        throw KeeperException.create(code)
+    }
   }
 
   def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = {
@@ -119,13 +183,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Sets topic partition states for the given partitions.
    * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
    */
-  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch], expectedControllerEpochZkVersion: Int): Seq[SetDataResponse] = {
     val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val path = TopicPartitionStateZNode.path(partition)
       val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
-      SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition))
+      SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition),
+        controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(setDataRequests.toSeq)
   }
@@ -133,15 +199,16 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Creates topic partition state znodes for the given partitions.
    * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
    */
-  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
-    createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
-    createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
+  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
+    createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq, expectedControllerEpochZkVersion)
+    createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq, expectedControllerEpochZkVersion)
     val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val path = TopicPartitionStateZNode.path(partition)
       val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
-      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
+      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests.toSeq)
   }
@@ -172,9 +239,10 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * Update the partition states of multiple partitions in zookeeper.
    * @param leaderAndIsrs The partition states to update.
    * @param controllerEpoch The current controller epoch.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return UpdateLeaderAndIsrResult instance containing per partition results.
    */
-  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
+  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int, expectedControllerEpochZkVersion: Int): UpdateLeaderAndIsrResult = {
     val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr]
     val updatesToRetry = mutable.Buffer.empty[TopicPartition]
     val failed = mutable.Map.empty[TopicPartition, Exception]
@@ -182,8 +250,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
       partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     }
     val setDataResponses = try {
-      setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+      setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, expectedControllerEpochZkVersion)
     } catch {
+      case e: ControllerMovedException => throw e
       case e: Exception =>
         leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
         return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
@@ -381,10 +450,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
    * @param assignment the partition to replica mapping to set for the given topic
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return SetDataResponse
    */
-  def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
-    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
+  def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int): SetDataResponse = {
+    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion,
+      zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
     retryRequestUntilConnected(setDataRequest)
   }
 
@@ -392,10 +463,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
    * @param assignment the partition to replica mapping to set for the given topic
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @throws KeeperException if there is an error while setting assignment
    */
-  def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
-    val setDataResponse = setTopicAssignmentRaw(topic, assignment)
+  def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = {
+    val setDataResponse = setTopicAssignmentRaw(topic, assignment, expectedControllerEpochZkVersion)
     setDataResponse.maybeThrow
   }
 
@@ -443,11 +515,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Deletes all log dir event notifications.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteLogDirEventNotifications(): Unit = {
+  def deleteLogDirEventNotifications(expectedControllerEpochZkVersion: Int): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
+      deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }
@@ -456,10 +529,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Deletes the log dir event notifications associated with the given sequence numbers.
    * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
+  def deleteLogDirEventNotifications(sequenceNumbers: Seq[String], expectedControllerEpochZkVersion: Int): Unit = {
     val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion)
+      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion,
+        zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(deleteRequests)
   }
@@ -677,9 +752,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Remove the given topics from the topics marked for deletion.
    * @param topics the topics to remove.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteTopicDeletions(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion))
+  def deleteTopicDeletions(topics: Seq[String], expectedControllerEpochZkVersion: Int): Unit = {
+    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion,
+      zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)))
     retryRequestsUntilConnected(deleteRequests)
   }
 
@@ -708,18 +785,20 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * exists or not.
    *
    * @param reassignment the reassignment to set on the reassignment znode
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @throws KeeperException if there is an error while setting or creating the znode
    */
-  def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit = {
+  def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int): Unit = {
 
     def set(reassignmentData: Array[Byte]): SetDataResponse = {
-      val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData, ZkVersion.MatchAnyVersion)
+      val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData, ZkVersion.MatchAnyVersion,
+        zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
       retryRequestUntilConnected(setDataRequest)
     }
 
     def create(reassignmentData: Array[Byte]): CreateResponse = {
       val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, acls(ReassignPartitionsZNode.path),
-        CreateMode.PERSISTENT)
+        CreateMode.PERSISTENT, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
       retryRequestUntilConnected(createRequest)
     }
 
@@ -744,9 +823,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Deletes the partition reassignment znode.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deletePartitionReassignment(): Unit = {
-    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion)
+  def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = {
+    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion,
+      zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -851,11 +932,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Deletes all isr change notifications.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteIsrChangeNotifications(): Unit = {
+  def deleteIsrChangeNotifications(expectedControllerEpochZkVersion: Int): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
+      deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }
@@ -864,10 +946,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Deletes the isr change notifications associated with the given sequence numbers.
    * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
+  def deleteIsrChangeNotifications(sequenceNumbers: Seq[String], expectedControllerEpochZkVersion: Int): Unit = {
     val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion)
+      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion,
+        zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(deleteRequests)
   }
@@ -897,9 +981,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Deletes the preferred replica election znode.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deletePreferredReplicaElection(): Unit = {
-    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, ZkVersion.MatchAnyVersion)
+  def deletePreferredReplicaElection(expectedControllerEpochZkVersion: Int): Unit = {
+    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, ZkVersion.MatchAnyVersion,
+      zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -919,9 +1005,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Deletes the controller znode.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteController(): Unit = {
-    val deleteRequest = DeleteRequest(ControllerZNode.path, ZkVersion.MatchAnyVersion)
+  def deleteController(expectedControllerEpochZkVersion: Int): Unit = {
+    val deleteRequest = DeleteRequest(ControllerZNode.path, ZkVersion.MatchAnyVersion,
+      zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -944,17 +1032,20 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Recursively deletes the topic znode.
    * @param topic the topic whose topic znode we wish to delete.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteTopicZNode(topic: String): Unit = {
-    deleteRecursive(TopicZNode.path(topic))
+  def deleteTopicZNode(topic: String, expectedControllerEpochZkVersion: Int): Unit = {
+    deleteRecursive(TopicZNode.path(topic), expectedControllerEpochZkVersion)
   }
 
   /**
    * Deletes the topic configs for the given topics.
    * @param topics the topics whose configs we wish to delete.
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
-  def deleteTopicConfigs(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ZkVersion.MatchAnyVersion))
+  def deleteTopicConfigs(topics: Seq[String], expectedControllerEpochZkVersion: Int): Unit = {
+    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic),
+      ZkVersion.MatchAnyVersion, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)))
     retryRequestsUntilConnected(deleteRequests)
   }
 
@@ -1403,18 +1494,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Deletes the given zk path recursively
    * @param path
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return true if path gets deleted successfully, false if root path doesn't exist
    * @throws KeeperException if there is an error while deleting the znodes
    */
-  def deleteRecursive(path: String): Boolean = {
+  def deleteRecursive(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion): Boolean = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
     getChildrenResponse.resultCode match {
       case Code.OK =>
-        getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
-        val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.MatchAnyVersion))
-        if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
+        getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child", expectedControllerEpochZkVersion))
+        val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.MatchAnyVersion,
+          zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)))
+        if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE)
           throw deleteResponse.resultException.get
-        }
         true
       case Code.NONODE => false
       case _ => throw getChildrenResponse.resultException.get
@@ -1468,18 +1560,18 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   }
 
-  private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {
+  private def createTopicPartition(partitions: Seq[TopicPartition], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
     val createRequests = partitions.map { partition =>
       val path = TopicPartitionZNode.path(partition)
-      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition))
+      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests)
   }
 
-  private def createTopicPartitions(topics: Seq[String]): Seq[CreateResponse] = {
+  private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = {
     val createRequests = topics.map { topic =>
       val path = TopicPartitionsZNode.path(topic)
-      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
+      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests)
   }
@@ -1513,13 +1605,16 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
         requestResponsePairs.foreach { case (request, response) =>
           if (response.resultCode == Code.CONNECTIONLOSS)
             remainingRequests += request
-          else
+          else {
+            maybeThrowControllerMoveException(response)
             responses += response
+          }
         }
 
         if (remainingRequests.nonEmpty)
           zooKeeperClient.waitUntilConnected()
       } else {
+        batchResponses.foreach(maybeThrowControllerMoveException)
         remainingRequests.clear()
         responses ++= batchResponses
       }
@@ -1599,4 +1694,31 @@ object KafkaZkClient {
       time, metricGroup, metricType)
     new KafkaZkClient(zooKeeperClient, isSecure, time)
   }
+
+
+  private def controllerZkVersionCheck(version: Int): Option[ZkVersionCheck] = {
+    if (version < KafkaController.InitialControllerEpochZkVersion)
+      None
+    else
+      Some(ZkVersionCheck(ControllerEpochZNode.path, version))
+  }
+
+  private def maybeThrowControllerMoveException(response: AsyncResponse): Unit = {
+    response.zkVersionCheckResult match {
+      case Some(zkVersionCheckResult) =>
+        val zkVersionCheck = zkVersionCheckResult.zkVersionCheck
+        if (zkVersionCheck.checkPath.equals(ControllerEpochZNode.path))
+          zkVersionCheckResult.opResult match {
+            case errorResult: ErrorResult =>
+              val errorCode = Code.get(errorResult.getErr)
+              if (errorCode == Code.BADVERSION)
+              // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails
+                throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${zkVersionCheck.expectedZkVersion}")
+              else if (errorCode != Code.OK)
+                throw KeeperException.create(errorCode, zkVersionCheck.checkPath)
+            case _ =>
+          }
+      case None =>
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 97ec9a4..5930414 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -17,21 +17,23 @@
 
 package kafka.zookeeper
 
+import java.util
 import java.util.Locale
 import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, Semaphore, TimeUnit}
+import java.util.concurrent._
 
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.{KafkaScheduler, Logging}
 import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.AsyncCallback._
 import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.OpResult.{CreateResult, SetDataResult}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
 import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper}
+import org.apache.zookeeper._
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Set
@@ -156,6 +158,10 @@ class ZooKeeperClient(connectString: String,
       responseQueue.asScala.toBuffer
     }
   }
+  
+  def createTransaction(): Transaction = {
+    zooKeeper.transaction()
+  }
 
   // Visibility to override for testing
   private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
@@ -166,44 +172,76 @@ class ZooKeeperClient(connectString: String,
 
     val sendTimeMs = time.hiResClockMs()
     request match {
-      case ExistsRequest(path, ctx) =>
+      case ExistsRequest(path, ctx, _) =>
         zooKeeper.exists(path, shouldWatch(request), new StatCallback {
           override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
             callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
-      case GetDataRequest(path, ctx) =>
+      case GetDataRequest(path, ctx, _) =>
         zooKeeper.getData(path, shouldWatch(request), new DataCallback {
           override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
             callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
-      case GetChildrenRequest(path, ctx) =>
+      case GetChildrenRequest(path, ctx, _) =>
         zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
           override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit =
             callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
               Option(children).map(_.asScala).getOrElse(Seq.empty), stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
-      case CreateRequest(path, data, acl, createMode, ctx) =>
-        zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
-            callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs)))
-        }, ctx.orNull)
-      case SetDataRequest(path, data, version, ctx) =>
-        zooKeeper.setData(path, data, version, new StatCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
-        }, ctx.orNull)
-      case DeleteRequest(path, version, ctx) =>
-        zooKeeper.delete(path, version, new VoidCallback {
-          override def processResult(rc: Int, path: String, ctx: Any): Unit =
-            callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs)))
-        }, ctx.orNull)
-      case GetAclRequest(path, ctx) =>
+      case CreateRequest(path, data, acl, createMode, ctx, zkVersionCheck) =>
+        if (zkVersionCheck.isEmpty)
+          zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
+            override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
+              callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs)))
+          }, ctx.orNull)
+        else
+          zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.create(path, data, acl.asJava, createMode)).asJava, new MultiCallback {
+            override def processResult(rc: Int, multiOpPath: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit = {
+              val (zkVersionCheckOpResult, requestOpResult) = (opResults.get(0), opResults.get(1))
+              val name = requestOpResult match {
+                case c: CreateResult => c.getPath
+                case _ => null
+              }
+              callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs), 
+                Some(ZkVersionCheckResult(zkVersionCheck.get, zkVersionCheckOpResult))))
+            }}, ctx.orNull)
+      case SetDataRequest(path, data, version, ctx, zkVersionCheck) =>
+        if (zkVersionCheck.isEmpty)
+          zooKeeper.setData(path, data, version, new StatCallback {
+            override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+              callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
+          }, ctx.orNull)
+        else
+          zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.setData(path, data, version)).asJava, new MultiCallback {
+            override def processResult(rc: Int, multiOpPath: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit = {
+              val (zkVersionCheckOpResult, requestOpResult) = (opResults.get(0), opResults.get(1))
+              val stat = requestOpResult match {
+                case s: SetDataResult => s.getStat
+                case _ => null
+              }
+              callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs), 
+                Some(ZkVersionCheckResult(zkVersionCheck.get, zkVersionCheckOpResult))))
+            }}, ctx.orNull)
+      case DeleteRequest(path, version, ctx, zkVersionCheck) =>
+        if (zkVersionCheck.isEmpty)
+          zooKeeper.delete(path, version, new VoidCallback {
+            override def processResult(rc: Int, path: String, ctx: Any): Unit =
+              callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs)))
+          }, ctx.orNull)
+        else
+          zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.delete(path, version)).asJava, new MultiCallback {
+            override def processResult(rc: Int, multiOpPath: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit = {
+              val (zkVersionCheckOpResult, _) = (opResults.get(0), opResults.get(1))
+              callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs), 
+                Some(ZkVersionCheckResult(zkVersionCheck.get, zkVersionCheckOpResult))))
+            }}, ctx.orNull)
+      case GetAclRequest(path, ctx, _) =>
         zooKeeper.getACL(path, null, new ACLCallback {
           override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = {
             callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
               stat, responseMetadata(sendTimeMs)))
-        }}, ctx.orNull)
-      case SetAclRequest(path, acl, version, ctx) =>
+          }}, ctx.orNull)
+      case SetAclRequest(path, acl, version, ctx, _) =>
         zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
           override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
             callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
@@ -329,7 +367,7 @@ class ZooKeeperClient(connectString: String,
   private[kafka] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
     zooKeeper
   }
-  
+
   private def reinitialize(): Unit = {
     // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
     // may require additional Zookeeper requests, which will block to acquire the initialization lock
@@ -447,45 +485,54 @@ sealed trait AsyncRequest {
   type Response <: AsyncResponse
   def path: String
   def ctx: Option[Any]
+  def zkVersionCheck: Option[ZkVersionCheck]
+}
+
+case class ZkVersionCheck(checkPath: String, expectedZkVersion: Int) {
+  def checkOp: Op = Op.check(checkPath, expectedZkVersion)
 }
 
+case class ZkVersionCheckResult(zkVersionCheck: ZkVersionCheck, opResult: OpResult)
+
 case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
-                         ctx: Option[Any] = None) extends AsyncRequest {
+                         ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = CreateResponse
 }
 
-case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = DeleteResponse
 }
 
-case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+case class ExistsRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = ExistsResponse
 }
 
-case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+case class GetDataRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = GetDataResponse
 }
 
-case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = SetDataResponse
 }
 
-case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+case class GetAclRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = GetAclResponse
 }
 
-case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = SetAclResponse
 }
 
-case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+case class GetChildrenRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest {
   type Response = GetChildrenResponse
 }
 
+
 sealed abstract class AsyncResponse {
   def resultCode: Code
   def path: String
   def ctx: Option[Any]
+  def zkVersionCheckResult: Option[ZkVersionCheckResult]
 
   /** Return None if the result code is OK and KeeperException otherwise. */
   def resultException: Option[KeeperException] =
@@ -506,17 +553,22 @@ case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) {
   def responseTimeMs: Long = receivedTimeMs - sendTimeMs
 }
 
-case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String, metadata: ResponseMetadata) extends AsyncResponse
-case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any], metadata: ResponseMetadata) extends AsyncResponse
-case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse
+case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String,
+                          metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any],
+                          metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat,
+                          metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
 case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat,
-                           metadata: ResponseMetadata) extends AsyncResponse
-case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse
+                           metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat,
+                           metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
 case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat,
-                          metadata: ResponseMetadata) extends AsyncResponse
-case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse
+                          metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
+case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat,
+                          metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
 case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat,
-                               metadata: ResponseMetadata) extends AsyncResponse
+                               metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse
 
 class ZooKeeperClientException(message: String) extends RuntimeException(message)
 class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message)
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 4f40b27..b44c239 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -20,7 +20,7 @@ import kafka.common.AdminCommandFailedException
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
-import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
+import kafka.zk.{ReassignPartitionsZNode, ZkVersion, ZooKeeperTestHarness}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
@@ -613,7 +613,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     )
 
     // Set znode directly to avoid non-existent topic validation
-    zkClient.setOrCreatePartitionReassignment(firstMove)
+    zkClient.setOrCreatePartitionReassignment(firstMove, ZkVersion.MatchAnyVersion)
 
     servers.foreach(_.startup())
     waitForReassignmentToComplete()
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index e5753e5..e0a753c 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -54,7 +54,7 @@ class ControllerEventManagerTest {
     val controllerStats = new ControllerStats
     val eventProcessedListenerCount = new AtomicInteger
     controllerEventManager = new ControllerEventManager(0, controllerStats.rateAndTimeMetrics,
-      _ => eventProcessedListenerCount.incrementAndGet)
+      _ => eventProcessedListenerCount.incrementAndGet, () => ())
     controllerEventManager.start()
 
     val initialTimerCount = timer(metricName).count
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 5e5d84f..dc4076a 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -17,23 +17,29 @@
 
 package kafka.controller
 
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.Properties
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Timer
 import kafka.api.LeaderAndIsr
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
-import kafka.zk.{PreferredReplicaElectionZNode, ZooKeeperTestHarness}
+import kafka.zk._
 import org.junit.{After, Before, Test}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.log4j.Level
+import kafka.utils.LogCaptureAppender
 
 import scala.collection.JavaConverters._
 import scala.util.Try
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
   var servers = Seq.empty[KafkaServer]
+  val firstControllerEpoch = KafkaController.InitialControllerEpoch + 1
+  val firstControllerEpochZkVersion = KafkaController.InitialControllerEpochZkVersion + 1
 
   @Before
   override def setUp() {
@@ -51,30 +57,30 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   def testEmptyCluster(): Unit = {
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
-    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
+    waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set controller epoch")
   }
 
   @Test
   def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
-    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
+    waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set controller epoch")
     servers.head.shutdown()
     servers.head.awaitShutdown()
     TestUtils.waitUntilTrue(() => !zkClient.getControllerId.isDefined, "failed to kill controller")
-    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "controller epoch was not persisted after broker failure")
+    waitUntilControllerEpoch(firstControllerEpoch, "controller epoch was not persisted after broker failure")
   }
 
   @Test
   def testControllerMoveIncrementsControllerEpoch(): Unit = {
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
-    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
+    waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set controller epoch")
     servers.head.shutdown()
     servers.head.awaitShutdown()
     servers.head.startup()
     TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
-    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1, "controller epoch was not incremented after controller move")
+    waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move")
   }
 
   @Test
@@ -83,7 +89,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(0))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+    waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
   }
 
@@ -97,7 +103,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1))
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
   }
 
@@ -109,8 +115,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val assignment = Map(tp0.partition -> Seq(0))
     val expandedAssignment = Map(tp0 -> Seq(0), tp1 -> Seq(0))
     TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
-    zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
-    waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+    zkClient.setTopicAssignment(tp0.topic, expandedAssignment, firstControllerEpochZkVersion)
+    waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic partition expansion")
     TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
   }
@@ -127,8 +133,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
-    waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
+    zkClient.setTopicAssignment(tp0.topic, expandedAssignment, firstControllerEpochZkVersion)
+    waitForPartitionState(tp1, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic partition expansion")
     TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition)
   }
@@ -147,7 +153,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val reassignment = Map(tp -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     zkClient.createPartitionReassignment(reassignment)
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
       "failed to get expected partition state after partition reassignment")
     TestUtils.waitUntilTrue(() =>  zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
       "failed to get updated partition assignment on topic znode after partition reassignment")
@@ -169,8 +175,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    zkClient.setOrCreatePartitionReassignment(reassignment)
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+    val controller = getController()
+    zkClient.setOrCreatePartitionReassignment(reassignment, controller.kafkaController.controllerContext.epochZkVersion)
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state during partition reassignment with offline replica")
     TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(),
       "partition reassignment path should remain while reassignment in progress")
@@ -188,10 +195,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
     zkClient.createPartitionReassignment(reassignment)
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state during partition reassignment with offline replica")
     servers(otherBrokerId).startup()
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
       "failed to get expected partition state after partition reassignment")
     TestUtils.waitUntilTrue(() => zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
       "failed to get updated partition assignment on topic znode after partition reassignment")
@@ -235,7 +242,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     zkClient.createPreferredReplicaElection(Set(tp))
     TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
       "failed to remove preferred replica leader election path after giving up")
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state upon broker shutdown")
   }
 
@@ -249,10 +256,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state upon broker shutdown")
     servers(otherBrokerId).startup()
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
       "failed to get expected partition state upon broker startup")
   }
 
@@ -264,14 +271,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
       val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
-        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
+        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
     }, "failed to get expected partition state after entire isr went offline")
   }
@@ -284,14 +291,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
     servers(1).shutdown()
     servers(1).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
       val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
-        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
+        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
     }, "failed to get expected partition state after entire isr went offline")
   }
@@ -341,18 +348,105 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
   }
 
+  @Test
+  def testControllerMoveOnTopicCreation(): Unit = {
+    servers = makeServers(1)
+    TestUtils.waitUntilControllerElected(zkClient)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(0))
+
+    testControllerMove(() => {
+      val adminZkClient = new AdminZkClient(zkClient)
+      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, assignment, new Properties())
+    })
+  }
+
+  @Test
+  def testControllerMoveOnTopicDeletion(): Unit = {
+    servers = makeServers(1)
+    TestUtils.waitUntilControllerElected(zkClient)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(0))
+    TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
+
+    testControllerMove(() => {
+      val adminZkClient = new AdminZkClient(zkClient)
+      adminZkClient.deleteTopic(tp.topic())
+    })
+  }
+
+  @Test
+  def testControllerMoveOnPreferredReplicaElection(): Unit = {
+    servers = makeServers(1)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(0))
+    TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
+
+    testControllerMove(() => zkClient.createPreferredReplicaElection(Set(tp)))
+  }
+
+  @Test
+  def testControllerMoveOnPartitionReassignment(): Unit = {
+    servers = makeServers(1)
+    TestUtils.waitUntilControllerElected(zkClient)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(0))
+    TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
+
+    val reassignment = Map(tp -> Seq(0))
+    testControllerMove(() => zkClient.createPartitionReassignment(reassignment))
+  }
+
+  private def testControllerMove(fun: () => Unit): Unit = {
+    val controller = getController().kafkaController
+    val appender = LogCaptureAppender.createAndRegister()
+    val previousLevel = LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass, Level.INFO)
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        controller.eventManager.state == ControllerState.Idle
+      }, "Controller event thread is still busy")
+
+      val latch = new CountDownLatch(1)
+
+      // Let the controller event thread await on a latch before the pre-defined logic is triggered.
+      // This is used to make sure that when the event thread resumes and starts processing events, the controller has already moved.
+      controller.eventManager.put(KafkaController.AwaitOnLatch(latch))
+      // Execute pre-defined logic. This can be topic creation/deletion, preferred leader election, etc.
+      fun()
+
+      // Delete the controller path, re-create /controller znode to emulate controller movement
+      zkClient.deleteController(controller.controllerContext.epochZkVersion)
+      zkClient.registerControllerAndIncrementControllerEpoch(servers.size)
+
+      // Resume the controller event thread. At this point, the controller should see mismatch controller epoch zkVersion and resign
+      latch.countDown()
+      TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to resign")
+
+      // Expect to capture the ControllerMovedException in the log of ControllerEventThread
+      val event = appender.getMessages.find(e => e.getLevel == Level.INFO
+        && e.getThrowableInformation != null
+        && e.getThrowableInformation.getThrowable.getClass.getName.equals(new ControllerMovedException("").getClass.getName))
+      assertTrue(event.isDefined)
+
+    } finally {
+      LogCaptureAppender.unregister(appender)
+      LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass, previousLevel)
+    }
+  }
+
   private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicPartition,
                                              replicas: Set[Int], leaderEpoch: Int): Unit = {
     otherBroker.shutdown()
     otherBroker.awaitShutdown()
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, leaderEpoch + 1,
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, leaderEpoch + 1,
       "failed to get expected partition state upon broker shutdown")
     otherBroker.startup()
     TestUtils.waitUntilTrue(() => zkClient.getInSyncReplicasForPartition(new TopicPartition(tp.topic, tp.partition)).get.toSet == replicas, "restarted broker failed to join in-sync replicas")
     zkClient.createPreferredReplicaElection(Set(tp))
     TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
       "failed to remove preferred replica leader election path after completion")
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2,
+    waitForPartitionState(tp, firstControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2,
       "failed to get expected partition state upon broker startup")
   }
 
@@ -395,4 +489,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Timer]
   }
 
+  private def getController(): KafkaServer = {
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    servers.filter(s => s.config.brokerId == controllerId).head
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 6a587f3..b89632e 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -22,7 +22,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper.{CreateResponse, GetDataResponse, ResponseMetadata, ZooKeeperClientException}
+import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
@@ -85,7 +85,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
       .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, ResponseMetadata(0, 0))))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
@@ -103,7 +103,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
       .andThrow(new ZooKeeperClientException("test"))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
@@ -119,7 +119,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
       .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null, ResponseMetadata(0, 0))))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
@@ -159,7 +159,7 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
@@ -190,7 +190,7 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId),
@@ -243,7 +243,7 @@ class PartitionStateMachineTest extends JUnitSuite {
       .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
@@ -336,7 +336,7 @@ class PartitionStateMachineTest extends JUnitSuite {
       val updatedLeaderAndIsr = partitions.map { partition =>
         partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
       }.toMap
-      EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch))
+      EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch, controllerContext.epochZkVersion))
         .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr, Seq.empty, Map.empty))
     }
     prepareMockToUpdateLeaderAndIsr()
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index c573c9f..ef274fa 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -183,7 +183,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)).andReturn(
       Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
-    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)).andReturn(false)
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
new file mode 100644
index 0000000..80472e9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
+
+import scala.collection.mutable.ListBuffer
+
+class LogCaptureAppender extends AppenderSkeleton {
+  private val events: ListBuffer[LoggingEvent] = ListBuffer.empty
+
+  override protected def append(event: LoggingEvent): Unit = {
+    events.synchronized {
+      events += event
+    }
+  }
+
+  def getMessages: ListBuffer[LoggingEvent] = {
+    events.synchronized {
+      return events.clone()
+    }
+  }
+
+  override def close(): Unit =  {
+    events.synchronized {
+      events.clear()
+    }
+  }
+
+  override def requiresLayout: Boolean = false
+}
+
+object LogCaptureAppender {
+  def createAndRegister(): LogCaptureAppender = {
+    val logCaptureAppender: LogCaptureAppender = new LogCaptureAppender
+    Logger.getRootLogger.addAppender(logCaptureAppender)
+    logCaptureAppender
+  }
+
+  def setClassLoggerLevel(clazz: Class[_], logLevel: Level): Level = {
+    val logger = Logger.getLogger(clazz)
+    val previousLevel = logger.getLevel
+    Logger.getLogger(clazz).setLevel(logLevel)
+    previousLevel
+  }
+
+  def unregister(logCaptureAppender: LogCaptureAppender): Unit = {
+    Logger.getRootLogger.removeAppender(logCaptureAppender)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 3389161..65273eb 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -20,7 +20,7 @@ package kafka.utils
 import kafka.server.{KafkaConfig, ReplicaFetcherManager}
 import kafka.api.LeaderAndIsr
 import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.{IsrChangeNotificationZNode, TopicZNode, ZooKeeperTestHarness}
+import kafka.zk._
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -42,7 +42,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
     val topicPartition = new TopicPartition(topic, partition)
     val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
-    zkClient.createTopicPartitionStatesRaw(Map(topicPartition -> leaderIsrAndControllerEpoch))
+    zkClient.createTopicPartitionStatesRaw(Map(topicPartition -> leaderIsrAndControllerEpoch), ZkVersion.MatchAnyVersion)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 48d7d3f..d42d02c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -36,7 +36,7 @@ import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import Implicits._
 import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
+import kafka.zk._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
 import org.apache.kafka.clients.consumer._
@@ -635,7 +635,7 @@ object TestUtils extends Logging {
         .getOrElse(LeaderAndIsr(leader, List(leader)))
       topicPartition -> LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch)
     }
-    zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs)
+    zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs, ZkVersion.MatchAnyVersion)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9cffb51..61ca3bb 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -42,6 +42,7 @@ import scala.util.Random
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
+import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.Stat
 
@@ -55,12 +56,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val topicPartition11 = new TopicPartition(topic1, 1)
   val topicPartition20 = new TopicPartition(topic2, 0)
   val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+  val controllerEpochZkVersion = 0
 
   var otherZkClient: KafkaZkClient = _
 
   @Before
   override def setUp(): Unit = {
     super.setUp()
+    zkClient.createControllerEpochRaw(1)
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
@@ -69,6 +72,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   override def tearDown(): Unit = {
     if (otherZkClient != null)
       otherZkClient.close()
+    zkClient.deletePath(ControllerEpochZNode.path)
     super.tearDown()
   }
 
@@ -99,16 +103,29 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     zkClient.createRecursive("/delete/some/random/path")
     assertTrue(zkClient.pathExists("/delete/some/random/path"))
-    zkClient.deleteRecursive("/delete")
-    assertFalse(zkClient.pathExists("/delete/some/random/path"))
-    assertFalse(zkClient.pathExists("/delete/some/random"))
-    assertFalse(zkClient.pathExists("/delete/some"))
+    assertTrue(zkClient.deleteRecursive("/delete"))
     assertFalse(zkClient.pathExists("/delete"))
 
     intercept[IllegalArgumentException](zkClient.deleteRecursive("delete-invalid-path"))
   }
 
   @Test
+  def testDeleteRecursiveWithControllerEpochVersionCheck(): Unit = {
+    assertFalse(zkClient.deleteRecursive("/delete/does-not-exist", controllerEpochZkVersion))
+
+    zkClient.createRecursive("/delete/some/random/path")
+    assertTrue(zkClient.pathExists("/delete/some/random/path"))
+    intercept[ControllerMovedException](
+      zkClient.deleteRecursive("/delete", controllerEpochZkVersion + 1))
+
+    assertTrue(zkClient.deleteRecursive("/delete", controllerEpochZkVersion))
+    assertFalse(zkClient.pathExists("/delete"))
+
+    intercept[IllegalArgumentException](zkClient.deleteRecursive(
+      "delete-invalid-path", controllerEpochZkVersion))
+  }
+
+  @Test
   def testCreateRecursive() {
     zkClient.createRecursive("/create-newrootpath")
     assertTrue(zkClient.pathExists("/create-newrootpath"))
@@ -268,7 +285,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   @Test
   def testIsrChangeNotificationsDeletion(): Unit = {
     // Should not fail even if parent node does not exist
-    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+    zkClient.deleteIsrChangeNotifications(Seq("0000000000"), controllerEpochZkVersion)
 
     zkClient.createRecursive("/isr_change_notification")
 
@@ -276,13 +293,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.propagateIsrChanges(Set(topicPartition10))
     zkClient.propagateIsrChanges(Set(topicPartition11))
 
-    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    // Should throw exception if the controllerEpochZkVersion does not match
+    intercept[ControllerMovedException](zkClient.deleteIsrChangeNotifications(Seq("0000000001"), controllerEpochZkVersion + 1))
+    // Delete should not succeed
+    assertEquals(Set("0000000000", "0000000001", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"), controllerEpochZkVersion)
     // Should not fail if called on a non-existent notification
-    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"), controllerEpochZkVersion)
 
     assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
-    zkClient.deleteIsrChangeNotifications()
-    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+    zkClient.deleteIsrChangeNotifications(controllerEpochZkVersion)
+    assertEquals(Seq.empty, zkClient.getAllIsrChangeNotifications)
   }
 
   @Test
@@ -335,7 +357,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   @Test
   def testLogDirEventNotificationsDeletion(): Unit = {
     // Should not fail even if parent node does not exist
-    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"), controllerEpochZkVersion)
 
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -346,13 +368,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.propagateLogDirEvent(brokerId)
     zkClient.propagateLogDirEvent(anotherBrokerId)
 
-    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+    intercept[ControllerMovedException](zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"), controllerEpochZkVersion + 1))
+    assertEquals(Seq("0000000000", "0000000001", "0000000002"), zkClient.getAllLogDirEventNotifications)
+
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"), controllerEpochZkVersion)
 
     assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
 
     zkClient.propagateLogDirEvent(anotherBrokerId)
 
-    zkClient.deleteLogDirEventNotifications()
+    zkClient.deleteLogDirEventNotifications(controllerEpochZkVersion)
     assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
   }
 
@@ -368,14 +393,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       new TopicPartition("topic_b", 0) -> Seq(4, 5),
       new TopicPartition("topic_c", 0) -> Seq(5, 3)
     )
-    zkClient.setOrCreatePartitionReassignment(reassignment)
+
+    // Should throw ControllerMovedException if the controller epoch zkVersion does not match
+    intercept[ControllerMovedException](zkClient.setOrCreatePartitionReassignment(reassignment, controllerEpochZkVersion + 1))
+
+    zkClient.setOrCreatePartitionReassignment(reassignment, controllerEpochZkVersion)
     assertEquals(reassignment, zkClient.getPartitionReassignment)
 
-    val updatedReassingment = reassignment - new TopicPartition("topic_b", 0)
-    zkClient.setOrCreatePartitionReassignment(updatedReassingment)
-    assertEquals(updatedReassingment, zkClient.getPartitionReassignment)
+    val updatedReassignment = reassignment - new TopicPartition("topic_b", 0)
+    zkClient.setOrCreatePartitionReassignment(updatedReassignment, controllerEpochZkVersion)
+    assertEquals(updatedReassignment, zkClient.getPartitionReassignment)
 
-    zkClient.deletePartitionReassignment()
+    zkClient.deletePartitionReassignment(controllerEpochZkVersion)
     assertEquals(Map.empty, zkClient.getPartitionReassignment)
 
     zkClient.createPartitionReassignment(reassignment)
@@ -513,9 +542,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testDeleteTopicZNode(): Unit = {
-    zkClient.deleteTopicZNode(topic1)
+    zkClient.deleteTopicZNode(topic1, controllerEpochZkVersion)
     zkClient.createRecursive(TopicZNode.path(topic1))
-    zkClient.deleteTopicZNode(topic1)
+    zkClient.deleteTopicZNode(topic1, controllerEpochZkVersion)
     assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
   }
 
@@ -530,7 +559,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.isTopicMarkedForDeletion(topic1))
     assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet)
 
-    zkClient.deleteTopicDeletions(Seq(topic1, topic2))
+    intercept[ControllerMovedException](zkClient.deleteTopicDeletions(Seq(topic1, topic2), controllerEpochZkVersion + 1))
+    assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet)
+
+    zkClient.deleteTopicDeletions(Seq(topic1, topic2), controllerEpochZkVersion)
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
@@ -564,7 +596,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps)
     assertEquals(Set(topic1, topic2), zkClient.getAllEntitiesWithConfig(ConfigType.Topic).toSet)
 
-    zkClient.deleteTopicConfigs(Seq(topic1, topic2))
+    zkClient.deleteTopicConfigs(Seq(topic1, topic2), controllerEpochZkVersion)
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
   }
 
@@ -742,22 +774,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       Map(
         topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
         topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
-      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4, controllerEpochZkVersion))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion)
 
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    // Mismatch controller epoch zkVersion
+    intercept[ControllerMovedException](zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4, controllerEpochZkVersion + 1))
 
+    // successful updates
     checkUpdateLeaderAndIsrResult(
       leaderIsrs(state = 1, zkVersion = 1),
       mutable.ArrayBuffer.empty,
       Map.empty,
-      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4, controllerEpochZkVersion))
 
     // Try to update with wrong ZK version
     checkUpdateLeaderAndIsrResult(
       Map.empty,
       ArrayBuffer(topicPartition10, topicPartition11),
       Map.empty,
-      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4, controllerEpochZkVersion))
 
     // Trigger successful, to be retried and failed partitions in same call
     val mixedState = Map(
@@ -770,7 +806,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       ArrayBuffer(topicPartition11),
       Map(
         topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
-      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4, controllerEpochZkVersion))
   }
 
   private def checkGetDataResponse(
@@ -786,9 +822,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
   }
 
-  private def eraseMetadata(response: CreateResponse): CreateResponse =
-    response.copy(metadata = ResponseMetadata(0, 0))
-
+  private def eraseUncheckedInfoInCreateResponse(response: CreateResponse): CreateResponse =
+    response.copy(metadata = ResponseMetadata(0, 0), zkVersionCheckResult = None)
+  
   @Test
   def testGetTopicsAndPartitions(): Unit = {
     assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
@@ -800,7 +836,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getAllPartitions.isEmpty)
 
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion)
     assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
   }
 
@@ -808,14 +844,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
+    // Mismatch controller epoch zkVersion
+    intercept[ControllerMovedException](zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion + 1))
+
     assertEquals(
       Seq(
         CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
           TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
         CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
           TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
-      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-        .map(eraseMetadata).toList)
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion)
+        .map(eraseUncheckedInfoInCreateResponse).toList)
 
     val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
     assertEquals(2, getResponses.size)
@@ -824,11 +863,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // Trying to create existing topicPartition states fails
     assertEquals(
       Seq(
-        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
-          null, ResponseMetadata(0, 0)),
-        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
-          null, ResponseMetadata(0, 0))),
-      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), null, ResponseMetadata(0, 0)),
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), null, ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion).map(eraseUncheckedInfoInCreateResponse).toList)
   }
 
   @Test
@@ -837,7 +874,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
       topicPartitions.map { topicPartition =>
         SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
-          Some(topicPartition), stat, ResponseMetadata(0, 0))
+          Some(topicPartition), stat, ResponseMetadata(0, 0), None)
       }
 
     zkClient.createRecursive(TopicZNode.path(topic1))
@@ -845,16 +882,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // Trying to set non-existing topicPartition's data results in NONODE responses
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
-      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
-        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion).map {
+        _.copy(metadata = ResponseMetadata(0, 0), zkVersionCheckResult = None)}.toList)
 
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion)
 
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
-      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
-        eraseMetadataAndStat}.toList)
+      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), controllerEpochZkVersion).map {
+        eraseUncheckedInfoInSetDataResponse}.toList)
 
+    // Mismatch controller epoch zkVersion
+    intercept[ControllerMovedException](zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), controllerEpochZkVersion + 1))
 
     val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
     assertEquals(2, getResponses.size)
@@ -863,8 +902,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // Other ZK client can also write the state of a partition
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
-      otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
-        eraseMetadataAndStat}.toList)
+      otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1), controllerEpochZkVersion).map {
+        eraseUncheckedInfoInSetDataResponse}.toList)
   }
 
   @Test
@@ -881,7 +920,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     zkClient.createRecursive(TopicZNode.path(topic1))
 
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion)
     assertEquals(
       initialLeaderIsrAndControllerEpochs,
       zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -906,36 +945,38 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   }
 
-  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+  private def eraseUncheckedInfoInSetDataResponse(response: SetDataResponse): SetDataResponse = {
     val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
-    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+    response.copy(metadata = ResponseMetadata(0, 0), stat = stat, zkVersionCheckResult = None)
   }
 
   @Test
   def testControllerEpochMethods(): Unit = {
+    zkClient.deletePath(ControllerEpochZNode.path)
+
     assertEquals(None, zkClient.getControllerEpoch)
 
     assertEquals("Setting non existing nodes should return NONODE results",
       SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+      eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1, 0)))
 
     assertEquals("Creating non existing nodes is OK",
       CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
-      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+      eraseUncheckedInfoInCreateResponse(zkClient.createControllerEpochRaw(0)))
     assertEquals(0, zkClient.getControllerEpoch.get._1)
 
     assertEquals("Attemt to create existing nodes should return NODEEXISTS",
       CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+      eraseUncheckedInfoInCreateResponse(zkClient.createControllerEpochRaw(0)))
 
     assertEquals("Updating existing nodes is OK",
       SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+      eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1, 0)))
     assertEquals(1, zkClient.getControllerEpoch.get._1)
 
     assertEquals("Updating with wrong ZK version returns BADVERSION",
       SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+      eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1, 0)))
   }
 
   @Test
@@ -943,9 +984,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // No controller
     assertEquals(None, zkClient.getControllerId)
     // Create controller
-    zkClient.registerController(controllerId = 1, timestamp = 123456)
+    val (_, newEpochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(controllerId = 1)
     assertEquals(Some(1), zkClient.getControllerId)
-    zkClient.deleteController()
+    zkClient.deleteController(newEpochZkVersion)
     assertEquals(None, zkClient.getControllerId)
   }
 
@@ -1002,7 +1043,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       zkClient.createPreferredReplicaElection(electionPartitions)
     }
 
-    zkClient.deletePreferredReplicaElection()
+    // Mismatch controller epoch zkVersion
+    intercept[ControllerMovedException](zkClient.deletePreferredReplicaElection(controllerEpochZkVersion + 1))
+    assertEquals(electionPartitions, zkClient.getPreferredReplicaElection)
+
+    zkClient.deletePreferredReplicaElection(controllerEpochZkVersion)
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
   }