You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:15 UTC

[30/36] git commit: KAFKA-990 Fix ReassignPartitionCommand and improve usability; reviewed by Neha, Jun, Joel and Guozhang

KAFKA-990 Fix ReassignPartitionCommand and improve usability; reviewed by Neha, Jun, Joel and Guozhang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39fc578d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39fc578d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39fc578d

Branch: refs/heads/trunk
Commit: 39fc578d5da3f871fbcd5205de010f1f574507d1
Parents: 7640bee
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Aug 27 22:12:25 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Aug 27 22:12:35 2013 -0700

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  94 +++++++++++---
 .../kafka/controller/KafkaController.scala      | 130 +++++++++----------
 .../kafka/controller/ReplicaStateMachine.scala  |   7 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  18 +++
 4 files changed, 162 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 8d287f4..aa61fa1 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -18,6 +18,7 @@ package kafka.admin
 
 import joptsimple.OptionParser
 import kafka.utils._
+import collection._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
@@ -26,21 +27,40 @@ object ReassignPartitionsCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser
-    val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
-      "new replicas they should be reassigned to in the following format - \n" +
-       "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }]\n}")
+    val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." +
+      "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" +
+       "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
       .withRequiredArg
-      .describedAs("partition reassignment json file path")
+      .describedAs("topics to reassign json file path")
       .ofType(classOf[String])
+
+    val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" +
+      "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" +
+      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }],\n\"version\":1\n}")
+      .withRequiredArg
+      .describedAs("manual assignment json file path")
+      .ofType(classOf[String])
+
+    val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
+      " in the form \"0,1,2\". This is required for automatic topic reassignment.")
+      .withRequiredArg
+      .describedAs("brokerlist")
+      .ofType(classOf[String])
+
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
       "form host:port. Multiple URLS can be given to allow fail-over.")
       .withRequiredArg
       .describedAs("urls")
       .ofType(classOf[String])
 
+    val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run")
+      .withOptionalArg()
+      .describedAs("execute")
+      .ofType(classOf[String])
+
     val options = parser.parse(args : _*)
 
-    for(arg <- List(jsonFileOpt, zkConnectOpt)) {
+    for(arg <- List(zkConnectOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
@@ -48,24 +68,56 @@ object ReassignPartitionsCommand extends Logging {
       }
     }
 
-    val jsonFile = options.valueOf(jsonFileOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    val jsonString = Utils.readFileAsString(jsonFile)
-    var zkClient: ZkClient = null
+    if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) {
+      System.err.println("Only one of the json files should be specified")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
 
+    val zkConnect = options.valueOf(zkConnectOpt)
+    var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
     try {
-      // read the json file into a string
-      val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
-      if (partitionsToBeReassigned.isEmpty)
-        throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
-
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
-
-      if(reassignPartitionsCommand.reassignPartitions())
-        println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
-      else
-        println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+
+      var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
+
+      if(options.has(topicsToMoveJsonFileOpt)) {
+        val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
+        val brokerList = options.valueOf(brokerListOpt)
+        val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
+        val brokerListToReassign = brokerList.split(',') map (_.toInt)
+        val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+        val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+
+        val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
+        groupedByTopic.foreach { topicInfo =>
+          val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
+            topicInfo._2.head._2.size)
+          partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
+        }
+
+      } else if (options.has(manualAssignmentJsonFileOpt)) {
+        val manualAssignmentJsonFile =  options.valueOf(manualAssignmentJsonFileOpt)
+        val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile)
+        partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString)
+        if (partitionsToBeReassigned.isEmpty)
+          throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt))
+      } else {
+        System.err.println("Missing json file. One of the file needs to be specified")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+
+      if (options.has(executeOpt)) {
+        val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+
+        if(reassignPartitionsCommand.reassignPartitions())
+          println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+        else
+          println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+      } else {
+        System.out.println("This is a dry run (Use --execute to do the actual reassignment. " +
+          "The replica assignment is \n" + partitionsToBeReassigned.toString())
+      }
     } catch {
       case e =>
         println("Partitions reassignment failed due to " + e.getMessage)

http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index bde405a..ab18b7a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -356,13 +356,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * Reassigning replicas for a partition goes through a few stages -
    * RAR = Reassigned replicas
    * AR = Original list of replicas for partition
-   * 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR
-   * 2. Start new replicas RAR - AR.
-   * 3. Wait until new replicas are in sync with the leader
-   * 4. If the leader is not in RAR, elect a new leader from RAR
-   * 5. Stop old replicas AR - RAR
-   * 6. Write new AR
-   * 7. Remove partition from the /admin/reassign_partitions path
+   * 1. Start new replicas RAR - AR.
+   * 2. Wait until new replicas are in sync with the leader
+   * 3. If the leader is not in RAR, elect a new leader from RAR
+   * 4. Stop old replicas AR - RAR
+   * 5. Write new AR
+   * 6. Remove partition from the /admin/reassign_partitions path
    */
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
@@ -395,6 +394,54 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
+  private def watchIsrChangesForReassignedPartition(topic: String,
+                                                    partition: Int,
+                                                    reassignedPartitionContext: ReassignedPartitionsContext) {
+    val reassignedReplicas = reassignedPartitionContext.newReplicas
+    val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition,
+      reassignedReplicas.toSet)
+    reassignedPartitionContext.isrChangeListener = isrChangeListener
+    // register listener on the leader and isr path to wait until they catch up with the current leader
+    zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+  }
+
+  def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
+                                        reassignedPartitionContext: ReassignedPartitionsContext) {
+    val newReplicas = reassignedPartitionContext.newReplicas
+    val topic = topicAndPartition.topic
+    val partition = topicAndPartition.partition
+    val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+    try {
+      val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
+      assignedReplicasOpt match {
+        case Some(assignedReplicas) =>
+          if(assignedReplicas == newReplicas) {
+            throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
+              " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
+          } else {
+            if(aliveNewReplicas == newReplicas) {
+              info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
+              // first register ISR change listener
+              watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+              controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+            } else {
+              // some replica in RAR is not alive. Fail partition reassignment
+              throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
+                " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
+                "Failing partition reassignment")
+            }
+          }
+        case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
+          .format(topicAndPartition))
+      }
+    } catch {
+      case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
+      // remove the partition from the admin path to unblock the admin client
+      removePartitionFromReassignedPartitions(topicAndPartition)
+    }
+  }
+
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
@@ -501,12 +548,17 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
       controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
     reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
-    controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned
-    controllerContext.partitionsBeingReassigned --= reassignedPartitions
+    var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+    partitionsToReassign ++= partitionsBeingReassigned
+    partitionsToReassign --= reassignedPartitions
+
     info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
     info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
-    info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString()))
-    controllerContext.partitionsBeingReassigned.foreach(partition => onPartitionReassignment(partition._1, partition._2))
+    info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
+
+    partitionsToReassign.foreach { topicPartitionToReassign =>
+      initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
+    }
   }
 
   private def initializeAndMaybeTriggerPreferredReplicaElection() {
@@ -595,8 +647,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // stop watching the ISR changes for this partition
     zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
       controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
-    // update the assigned replica list
-    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
   }
 
   private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
@@ -795,39 +845,8 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
     val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
     newPartitions.foreach { partitionToBeReassigned =>
       controllerContext.controllerLock synchronized {
-        val topic = partitionToBeReassigned._1.topic
-        val partition = partitionToBeReassigned._1.partition
-        val newReplicas = partitionToBeReassigned._2
-        val topicAndPartition = partitionToBeReassigned._1
-        val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
-        try {
-          val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
-          assignedReplicasOpt match {
-            case Some(assignedReplicas) =>
-              if(assignedReplicas == newReplicas) {
-                throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
-                  " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
-              } else {
-                if(aliveNewReplicas == newReplicas) {
-                  info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
-                  val context = createReassignmentContextForPartition(topic, partition, newReplicas)
-                  controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
-                  controller.onPartitionReassignment(topicAndPartition, context)
-                } else {
-                  // some replica in RAR is not alive. Fail partition reassignment
-                  throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
-                    " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
-                    "Failing partition reassignment")
-                }
-              }
-            case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
-              .format(topicAndPartition))
-          }
-        } catch {
-          case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
-          // remove the partition from the admin path to unblock the admin client
-          controller.removePartitionFromReassignedPartitions(topicAndPartition)
-        }
+        val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
+        controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
       }
     }
   }
@@ -840,25 +859,6 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
   }
-
-  private def createReassignmentContextForPartition(topic: String,
-                                                    partition: Int,
-                                                    newReplicas: Seq[Int]): ReassignedPartitionsContext = {
-    val context = new ReassignedPartitionsContext(newReplicas)
-    // first register ISR change listener
-    watchIsrChangesForReassignedPartition(topic, partition, context)
-    context
-  }
-
-  private def watchIsrChangesForReassignedPartition(topic: String, partition: Int,
-                                                    reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition,
-      reassignedReplicas.toSet)
-    reassignedPartitionContext.isrChangeListener = isrChangeListener
-    // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
-  }
 }
 
 class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 9f752f4..c964857 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -182,7 +182,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     case None =>
                       true
                   }
-                else false
+                else {
+                  replicaState.put((topic, partition, replicaId), OfflineReplica)
+                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
+                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+                  false
+                }
               case None =>
                 true
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ba5eacc..ca1ce12 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -629,6 +629,24 @@ object ZkUtils extends Logging {
     reassignedPartitions
   }
 
+  def parseTopicsData(jsonData: String): Seq[String] = {
+    var topics = List.empty[String]
+    Json.parseFull(jsonData) match {
+      case Some(m) =>
+        m.asInstanceOf[Map[String, Any]].get("topics") match {
+          case Some(partitionsSeq) =>
+            val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
+            mapPartitionSeq.foreach(p => {
+              val topic = p.get("topic").get.asInstanceOf[String]
+              topics ++= List(topic)
+            })
+          case None =>
+        }
+      case None =>
+    }
+    topics
+  }
+
   def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
     var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
     for (p <- partitionsToBeReassigned) {