You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/04 01:25:43 UTC

git commit: KAFKA-1156 Improve reassignment tool to output the existing assignment to facilitate rollbacks; reviewed by Jun Rao

Updated Branches:
  refs/heads/trunk b638b0f34 -> ea3961fc8


KAFKA-1156 Improve reassignment tool to output the existing assignment to facilitate rollbacks; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea3961fc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea3961fc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea3961fc

Branch: refs/heads/trunk
Commit: ea3961fc84c5a45a5458b9db4080b32f28855f7b
Parents: b638b0f
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Dec 3 16:25:31 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Dec 3 16:25:40 2013 -0800

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala | 231 ++++++++++---------
 .../kafka/controller/KafkaController.scala      |  10 +-
 2 files changed, 123 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ea3961fc/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 70d1b81..65c04ed 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -26,127 +26,27 @@ import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 object ReassignPartitionsCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." +
-      "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" +
-       "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
-      .withRequiredArg
-      .describedAs("topics to reassign json file path")
-      .ofType(classOf[String])
-
-    val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" +
-      "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" +
-      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }],\n\"version\":1\n}")
-      .withRequiredArg
-      .describedAs("manual assignment json file path")
-      .ofType(classOf[String])
 
-    val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
-      " in the form \"0,1,2\". This is required for automatic topic reassignment.")
-      .withRequiredArg
-      .describedAs("brokerlist")
-      .ofType(classOf[String])
+    val opts = new ReassignPartitionsCommandOptions(args)
 
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
-      "form host:port. Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
-
-    val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run")
-      .withOptionalArg()
-      .describedAs("execute")
-      .ofType(classOf[String])
-
-    val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
-      "new replicas they should be reassigned to, which can be obtained from the output of a dry run.")
-      .withRequiredArg
-      .describedAs("partition reassignment json file path")
-      .ofType(classOf[String])
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(zkConnectOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
+    // should have exactly one action
+    val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _)
+    if(actions != 1) {
+      opts.parser.printHelpOn(System.err)
+      Utils.croak("Command must include exactly one action: --generate, --execute or --verify")
     }
 
-    if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) {
-      System.err.println("Only one of the json files should be specified")
-      parser.printHelpOn(System.err)
-      System.exit(1)
-    }
+    CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 
-    val zkConnect = options.valueOf(zkConnectOpt)
+    val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
     var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
     try {
-
-      var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
-
-      if(options.has(statusCheckJsonFileOpt)) {
-        val jsonFile = options.valueOf(statusCheckJsonFileOpt)
-        val jsonString = Utils.readFileAsString(jsonFile)
-        val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
-
-        println("Status of partition reassignment:")
-        val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
-        reassignedPartitionsStatus.foreach { partition =>
-          partition._2 match {
-            case ReassignmentCompleted =>
-              println("Reassignment of partition %s completed successfully".format(partition._1))
-            case ReassignmentFailed =>
-              println("Reassignment of partition %s failed".format(partition._1))
-            case ReassignmentInProgress =>
-              println("Reassignment of partition %s is still in progress".format(partition._1))
-          }
-        }
-      } else if(options.has(topicsToMoveJsonFileOpt)) {
-        if(!options.has(brokerListOpt)) {
-          System.err.println("broker-list is required if topics-to-move-json-file is used")
-          parser.printHelpOn(System.err)
-          System.exit(1)
-        }
-        val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
-        val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt)
-        val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
-        val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
-        val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
-
-        val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
-        groupedByTopic.foreach { topicInfo =>
-          val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
-            topicInfo._2.head._2.size)
-          partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
-        }
-      } else if (options.has(manualAssignmentJsonFileOpt)) {
-        val manualAssignmentJsonFile =  options.valueOf(manualAssignmentJsonFileOpt)
-        val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile)
-        partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString)
-        if (partitionsToBeReassigned.isEmpty)
-          throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt))
-      } else {
-        System.err.println("Missing json file. One of the file needs to be specified")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-
-      if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) {
-        if (options.has(executeOpt)) {
-          val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
-
-          if(reassignPartitionsCommand.reassignPartitions())
-            println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
-          else
-            println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
-        } else {
-          System.out.println("This is a dry run (Use --execute to do the actual reassignment. " +
-            "The following is the replica assignment. Save it for the status check option.\n" +
-            ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))
-        }
-      }
+      if(opts.options.has(opts.verifyOpt))
+        verifyAssignment(zkClient, opts)
+      else if(opts.options.has(opts.generateOpt))
+        generateAssignment(zkClient, opts)
+      else if (opts.options.has(opts.executeOpt))
+        executeAssignment(zkClient, opts)
     } catch {
       case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
@@ -157,6 +57,76 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
+  def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+    if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
+      opts.parser.printHelpOn(System.err)
+      Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
+    }
+    val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
+    val jsonString = Utils.readFileAsString(jsonFile)
+    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+
+    println("Status of partition reassignment:")
+    val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+    reassignedPartitionsStatus.foreach { partition =>
+      partition._2 match {
+        case ReassignmentCompleted =>
+          println("Reassignment of partition %s completed successfully".format(partition._1))
+        case ReassignmentFailed =>
+          println("Reassignment of partition %s failed".format(partition._1))
+        case ReassignmentInProgress =>
+          println("Reassignment of partition %s is still in progress".format(partition._1))
+      }
+    }
+  }
+
+  def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+    if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) {
+      opts.parser.printHelpOn(System.err)
+      Utils.croak("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
+    }
+    val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
+    val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
+    val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
+    val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+    val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+
+    var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
+    val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
+    groupedByTopic.foreach { topicInfo =>
+      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
+        topicInfo._2.head._2.size)
+      partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
+    }
+    val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+    println("Current partition replica assignment\n\n%s"
+      .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+    println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+  }
+
+  def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+    if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
+      opts.parser.printHelpOn(System.err)
+      Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " +
+        "during the --generate option")
+    }
+    val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
+    val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
+    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString)
+    if (partitionsToBeReassigned.isEmpty)
+      throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+    // before starting assignment, output the current replica assignment to facilitate rollback
+    val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+    println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
+      .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+    // start the reassignment
+    if(reassignPartitionsCommand.reassignPartitions())
+      println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+    else
+      println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+  }
+
   private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
   :Map[TopicAndPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
@@ -185,6 +155,39 @@ object ReassignPartitionsCommand extends Logging {
         }
     }
   }
+
+  class ReassignPartitionsCommandOptions(args: Array[String]) {
+    val parser = new OptionParser
+
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
+                      "form host:port. Multiple URLS can be given to allow fail-over.")
+                      .withRequiredArg
+                      .describedAs("urls")
+                      .ofType(classOf[String])
+    val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." +
+      " Note that this only generates a candidate assignment, it does not execute it.")
+    val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.")
+    val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option.")
+    val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" +
+                      "The format to use is - \n" +
+                      "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }],\n\"version\":1\n}")
+                      .withRequiredArg
+                      .describedAs("manual assignment json file path")
+                      .ofType(classOf[String])
+    val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "Generate a reassignment configuration to move the partitions" +
+                      " of the specified topics to the list of brokers specified by the --broker-list option. The format to use is - \n" +
+                      "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
+                      .withRequiredArg
+                      .describedAs("topics to reassign json file path")
+                      .ofType(classOf[String])
+    val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
+                      " in the form \"0,1,2\". This is required if --topics-to-move-json-file is used to generate reassignment configuration")
+                      .withRequiredArg
+                      .describedAs("brokerlist")
+                      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+  }
 }
 
 class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea3961fc/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4c319ab..a1e0f29 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -705,16 +705,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
-    // stop watching the ISR changes for this partition
-    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-      controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+    if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
+      // stop watching the ISR changes for this partition
+      zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+        controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+    }
     // read the current list of reassigned partitions from zookeeper
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
     ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
-    // update the cache
+    // update the cache. NO-OP if the partition's reassignment was never started
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }