You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/12/04 01:25:43 UTC
git commit: KAFKA-1156 Improve reassignment tool to output the
existing assignment to facilitate rollbacks; reviewed by Jun Rao
Updated Branches:
refs/heads/trunk b638b0f34 -> ea3961fc8
KAFKA-1156 Improve reassignment tool to output the existing assignment to facilitate rollbacks; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea3961fc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea3961fc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea3961fc
Branch: refs/heads/trunk
Commit: ea3961fc84c5a45a5458b9db4080b32f28855f7b
Parents: b638b0f
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Dec 3 16:25:31 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Dec 3 16:25:40 2013 -0800
----------------------------------------------------------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 231 ++++++++++---------
.../kafka/controller/KafkaController.scala | 10 +-
2 files changed, 123 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ea3961fc/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 70d1b81..65c04ed 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -26,127 +26,27 @@ import kafka.common.{TopicAndPartition, AdminCommandFailedException}
object ReassignPartitionsCommand extends Logging {
def main(args: Array[String]): Unit = {
- val parser = new OptionParser
- val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." +
- "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" +
- "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
- .withRequiredArg
- .describedAs("topics to reassign json file path")
- .ofType(classOf[String])
-
- val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" +
- "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" +
- "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}")
- .withRequiredArg
- .describedAs("manual assignment json file path")
- .ofType(classOf[String])
- val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
- " in the form \"0,1,2\". This is required for automatic topic reassignment.")
- .withRequiredArg
- .describedAs("brokerlist")
- .ofType(classOf[String])
+ val opts = new ReassignPartitionsCommandOptions(args)
- val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
- "form host:port. Multiple URLS can be given to allow fail-over.")
- .withRequiredArg
- .describedAs("urls")
- .ofType(classOf[String])
-
- val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run")
- .withOptionalArg()
- .describedAs("execute")
- .ofType(classOf[String])
-
- val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
- "new replicas they should be reassigned to, which can be obtained from the output of a dry run.")
- .withRequiredArg
- .describedAs("partition reassignment json file path")
- .ofType(classOf[String])
-
- val options = parser.parse(args : _*)
-
- for(arg <- List(zkConnectOpt)) {
- if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
+ // should have exactly one action
+ val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _)
+ if(actions != 1) {
+ opts.parser.printHelpOn(System.err)
+ Utils.croak("Command must include exactly one action: --generate, --execute or --verify")
}
- if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) {
- System.err.println("Only one of the json files should be specified")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
+ CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
- val zkConnect = options.valueOf(zkConnectOpt)
+ val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
try {
-
- var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
-
- if(options.has(statusCheckJsonFileOpt)) {
- val jsonFile = options.valueOf(statusCheckJsonFileOpt)
- val jsonString = Utils.readFileAsString(jsonFile)
- val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
-
- println("Status of partition reassignment:")
- val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
- reassignedPartitionsStatus.foreach { partition =>
- partition._2 match {
- case ReassignmentCompleted =>
- println("Reassignment of partition %s completed successfully".format(partition._1))
- case ReassignmentFailed =>
- println("Reassignment of partition %s failed".format(partition._1))
- case ReassignmentInProgress =>
- println("Reassignment of partition %s is still in progress".format(partition._1))
- }
- }
- } else if(options.has(topicsToMoveJsonFileOpt)) {
- if(!options.has(brokerListOpt)) {
- System.err.println("broker-list is required if topics-to-move-json-file is used")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
- val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
- val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt)
- val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
- val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
- val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
-
- val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
- groupedByTopic.foreach { topicInfo =>
- val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
- topicInfo._2.head._2.size)
- partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
- }
- } else if (options.has(manualAssignmentJsonFileOpt)) {
- val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt)
- val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile)
- partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString)
- if (partitionsToBeReassigned.isEmpty)
- throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt))
- } else {
- System.err.println("Missing json file. One of the file needs to be specified")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
-
- if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) {
- if (options.has(executeOpt)) {
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
-
- if(reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
- else
- println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
- } else {
- System.out.println("This is a dry run (Use --execute to do the actual reassignment. " +
- "The following is the replica assignment. Save it for the status check option.\n" +
- ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))
- }
- }
+ if(opts.options.has(opts.verifyOpt))
+ verifyAssignment(zkClient, opts)
+ else if(opts.options.has(opts.generateOpt))
+ generateAssignment(zkClient, opts)
+ else if (opts.options.has(opts.executeOpt))
+ executeAssignment(zkClient, opts)
} catch {
case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage)
@@ -157,6 +57,76 @@ object ReassignPartitionsCommand extends Logging {
}
}
+ def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+ if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
+ opts.parser.printHelpOn(System.err)
+ Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
+ }
+ val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
+ val jsonString = Utils.readFileAsString(jsonFile)
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+
+ println("Status of partition reassignment:")
+ val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+ reassignedPartitionsStatus.foreach { partition =>
+ partition._2 match {
+ case ReassignmentCompleted =>
+ println("Reassignment of partition %s completed successfully".format(partition._1))
+ case ReassignmentFailed =>
+ println("Reassignment of partition %s failed".format(partition._1))
+ case ReassignmentInProgress =>
+ println("Reassignment of partition %s is still in progress".format(partition._1))
+ }
+ }
+ }
+
+ def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+ if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) {
+ opts.parser.printHelpOn(System.err)
+ Utils.croak("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
+ }
+ val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
+ val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
+ val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
+ val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+ val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+
+ var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
+ val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
+ groupedByTopic.foreach { topicInfo =>
+ val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
+ topicInfo._2.head._2.size)
+ partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
+ }
+ val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+ println("Current partition replica assignment\n\n%s"
+ .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+ println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+ }
+
+ def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+ if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
+ opts.parser.printHelpOn(System.err)
+ Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " +
+ "during the --generate option")
+ }
+ val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
+ val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString)
+ if (partitionsToBeReassigned.isEmpty)
+ throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+ // before starting assignment, output the current replica assignment to facilitate rollback
+ val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+ println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
+ .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+ // start the reassignment
+ if(reassignPartitionsCommand.reassignPartitions())
+ println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+ else
+ println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+ }
+
private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
:Map[TopicAndPartition, ReassignmentStatus] = {
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
@@ -185,6 +155,39 @@ object ReassignPartitionsCommand extends Logging {
}
}
}
+
+ class ReassignPartitionsCommandOptions(args: Array[String]) {
+ val parser = new OptionParser
+
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
+ "form host:port. Multiple URLS can be given to allow fail-over.")
+ .withRequiredArg
+ .describedAs("urls")
+ .ofType(classOf[String])
+ val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." +
+ " Note that this only generates a candidate assignment, it does not execute it.")
+ val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.")
+ val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option.")
+ val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" +
+ "The format to use is - \n" +
+ "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}")
+ .withRequiredArg
+ .describedAs("manual assignment json file path")
+ .ofType(classOf[String])
+ val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "Generate a reassignment configuration to move the partitions" +
+ " of the specified topics to the list of brokers specified by the --broker-list option. The format to use is - \n" +
+ "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
+ .withRequiredArg
+ .describedAs("topics to reassign json file path")
+ .ofType(classOf[String])
+ val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
+ " in the form \"0,1,2\". This is required if --topics-to-move-json-file is used to generate reassignment configuration")
+ .withRequiredArg
+ .describedAs("brokerlist")
+ .ofType(classOf[String])
+
+ val options = parser.parse(args : _*)
+ }
}
class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
http://git-wip-us.apache.org/repos/asf/kafka/blob/ea3961fc/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4c319ab..a1e0f29 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -705,16 +705,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
- // stop watching the ISR changes for this partition
- zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
- controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+ if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
+ // stop watching the ISR changes for this partition
+ zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+ controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+ }
// read the current list of reassigned partitions from zookeeper
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
// remove this partition from that list
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
// write the new list to zookeeper
ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
- // update the cache
+ // update the cache. NO-OP if the partition's reassignment was never started
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
}