You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:15 UTC
[30/36] git commit: KAFKA-990 Fix ReassignPartitionCommand and
improve usability; reviewed by Neha, Jun, Joel and Guozhang
KAFKA-990 Fix ReassignPartitionCommand and improve usability; reviewed by Neha, Jun, Joel and Guozhang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39fc578d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39fc578d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39fc578d
Branch: refs/heads/trunk
Commit: 39fc578d5da3f871fbcd5205de010f1f574507d1
Parents: 7640bee
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Tue Aug 27 22:12:25 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Aug 27 22:12:35 2013 -0700
----------------------------------------------------------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 94 +++++++++++---
.../kafka/controller/KafkaController.scala | 130 +++++++++----------
.../kafka/controller/ReplicaStateMachine.scala | 7 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 18 +++
4 files changed, 162 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 8d287f4..aa61fa1 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -18,6 +18,7 @@ package kafka.admin
import joptsimple.OptionParser
import kafka.utils._
+import collection._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
@@ -26,21 +27,40 @@ object ReassignPartitionsCommand extends Logging {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
- val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
- "new replicas they should be reassigned to in the following format - \n" +
- "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }]\n}")
+ val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." +
+ "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" +
+ "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
.withRequiredArg
- .describedAs("partition reassignment json file path")
+ .describedAs("topics to reassign json file path")
.ofType(classOf[String])
+
+ val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" +
+ "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" +
+ "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}")
+ .withRequiredArg
+ .describedAs("manual assignment json file path")
+ .ofType(classOf[String])
+
+ val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
+ " in the form \"0,1,2\". This is required for automatic topic reassignment.")
+ .withRequiredArg
+ .describedAs("brokerlist")
+ .ofType(classOf[String])
+
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
"form host:port. Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
+ val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run")
+ .withOptionalArg()
+ .describedAs("execute")
+ .ofType(classOf[String])
+
val options = parser.parse(args : _*)
- for(arg <- List(jsonFileOpt, zkConnectOpt)) {
+ for(arg <- List(zkConnectOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
@@ -48,24 +68,56 @@ object ReassignPartitionsCommand extends Logging {
}
}
- val jsonFile = options.valueOf(jsonFileOpt)
- val zkConnect = options.valueOf(zkConnectOpt)
- val jsonString = Utils.readFileAsString(jsonFile)
- var zkClient: ZkClient = null
+ if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) {
+ System.err.println("Only one of the json files should be specified")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ val zkConnect = options.valueOf(zkConnectOpt)
+ var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
try {
- // read the json file into a string
- val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
- if (partitionsToBeReassigned.isEmpty)
- throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
-
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
-
- if(reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
- else
- println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+
+ var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
+
+ if(options.has(topicsToMoveJsonFileOpt)) {
+ val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
+ val brokerList = options.valueOf(brokerListOpt)
+ val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
+ val brokerListToReassign = brokerList.split(',') map (_.toInt)
+ val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+ val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+
+ val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
+ groupedByTopic.foreach { topicInfo =>
+ val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
+ topicInfo._2.head._2.size)
+ partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
+ }
+
+ } else if (options.has(manualAssignmentJsonFileOpt)) {
+ val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt)
+ val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile)
+ partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString)
+ if (partitionsToBeReassigned.isEmpty)
+ throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt))
+ } else {
+ System.err.println("Missing json file. One of the file needs to be specified")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+
+ if (options.has(executeOpt)) {
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+
+ if(reassignPartitionsCommand.reassignPartitions())
+ println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+ else
+ println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+ } else {
+ System.out.println("This is a dry run (Use --execute to do the actual reassignment. " +
+ "The replica assignment is \n" + partitionsToBeReassigned.toString())
+ }
} catch {
case e =>
println("Partitions reassignment failed due to " + e.getMessage)
http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index bde405a..ab18b7a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -356,13 +356,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* Reassigning replicas for a partition goes through a few stages -
* RAR = Reassigned replicas
* AR = Original list of replicas for partition
- * 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR
- * 2. Start new replicas RAR - AR.
- * 3. Wait until new replicas are in sync with the leader
- * 4. If the leader is not in RAR, elect a new leader from RAR
- * 5. Stop old replicas AR - RAR
- * 6. Write new AR
- * 7. Remove partition from the /admin/reassign_partitions path
+ * 1. Start new replicas RAR - AR.
+ * 2. Wait until new replicas are in sync with the leader
+ * 3. If the leader is not in RAR, elect a new leader from RAR
+ * 4. Stop old replicas AR - RAR
+ * 5. Write new AR
+ * 6. Remove partition from the /admin/reassign_partitions path
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
@@ -395,6 +394,54 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
+ private def watchIsrChangesForReassignedPartition(topic: String,
+ partition: Int,
+ reassignedPartitionContext: ReassignedPartitionsContext) {
+ val reassignedReplicas = reassignedPartitionContext.newReplicas
+ val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition,
+ reassignedReplicas.toSet)
+ reassignedPartitionContext.isrChangeListener = isrChangeListener
+ // register listener on the leader and isr path to wait until they catch up with the current leader
+ zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+ }
+
+ def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
+ reassignedPartitionContext: ReassignedPartitionsContext) {
+ val newReplicas = reassignedPartitionContext.newReplicas
+ val topic = topicAndPartition.topic
+ val partition = topicAndPartition.partition
+ val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+ try {
+ val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
+ assignedReplicasOpt match {
+ case Some(assignedReplicas) =>
+ if(assignedReplicas == newReplicas) {
+ throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
+ " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
+ } else {
+ if(aliveNewReplicas == newReplicas) {
+ info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
+ // first register ISR change listener
+ watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+ controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+ onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+ } else {
+ // some replica in RAR is not alive. Fail partition reassignment
+ throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
+ " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
+ "Failing partition reassignment")
+ }
+ }
+ case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
+ .format(topicAndPartition))
+ }
+ } catch {
+ case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
+ // remove the partition from the admin path to unblock the admin client
+ removePartitionFromReassignedPartitions(topicAndPartition)
+ }
+ }
+
def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
@@ -501,12 +548,17 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
- controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned
- controllerContext.partitionsBeingReassigned --= reassignedPartitions
+ var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+ partitionsToReassign ++= partitionsBeingReassigned
+ partitionsToReassign --= reassignedPartitions
+
info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
- info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString()))
- controllerContext.partitionsBeingReassigned.foreach(partition => onPartitionReassignment(partition._1, partition._2))
+ info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
+
+ partitionsToReassign.foreach { topicPartitionToReassign =>
+ initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
+ }
}
private def initializeAndMaybeTriggerPreferredReplicaElection() {
@@ -595,8 +647,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// stop watching the ISR changes for this partition
zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
- // update the assigned replica list
- controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
}
private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
@@ -795,39 +845,8 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
newPartitions.foreach { partitionToBeReassigned =>
controllerContext.controllerLock synchronized {
- val topic = partitionToBeReassigned._1.topic
- val partition = partitionToBeReassigned._1.partition
- val newReplicas = partitionToBeReassigned._2
- val topicAndPartition = partitionToBeReassigned._1
- val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
- try {
- val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
- assignedReplicasOpt match {
- case Some(assignedReplicas) =>
- if(assignedReplicas == newReplicas) {
- throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
- " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
- } else {
- if(aliveNewReplicas == newReplicas) {
- info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
- val context = createReassignmentContextForPartition(topic, partition, newReplicas)
- controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
- controller.onPartitionReassignment(topicAndPartition, context)
- } else {
- // some replica in RAR is not alive. Fail partition reassignment
- throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
- " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
- "Failing partition reassignment")
- }
- }
- case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
- .format(topicAndPartition))
- }
- } catch {
- case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
- // remove the partition from the admin path to unblock the admin client
- controller.removePartitionFromReassignedPartitions(topicAndPartition)
- }
+ val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
+ controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
}
}
}
@@ -840,25 +859,6 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
-
- private def createReassignmentContextForPartition(topic: String,
- partition: Int,
- newReplicas: Seq[Int]): ReassignedPartitionsContext = {
- val context = new ReassignedPartitionsContext(newReplicas)
- // first register ISR change listener
- watchIsrChangesForReassignedPartition(topic, partition, context)
- context
- }
-
- private def watchIsrChangesForReassignedPartition(topic: String, partition: Int,
- reassignedPartitionContext: ReassignedPartitionsContext) {
- val reassignedReplicas = reassignedPartitionContext.newReplicas
- val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition,
- reassignedReplicas.toSet)
- reassignedPartitionContext.isrChangeListener = isrChangeListener
- // register listener on the leader and isr path to wait until they catch up with the current leader
- zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
- }
}
class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 9f752f4..c964857 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -182,7 +182,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case None =>
true
}
- else false
+ else {
+ replicaState.put((topic, partition, replicaId), OfflineReplica)
+ stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
+ .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+ false
+ }
case None =>
true
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ba5eacc..ca1ce12 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -629,6 +629,24 @@ object ZkUtils extends Logging {
reassignedPartitions
}
+ def parseTopicsData(jsonData: String): Seq[String] = {
+ var topics = List.empty[String]
+ Json.parseFull(jsonData) match {
+ case Some(m) =>
+ m.asInstanceOf[Map[String, Any]].get("topics") match {
+ case Some(partitionsSeq) =>
+ val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
+ mapPartitionSeq.foreach(p => {
+ val topic = p.get("topic").get.asInstanceOf[String]
+ topics ++= List(topic)
+ })
+ case None =>
+ }
+ case None =>
+ }
+ topics
+ }
+
def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
for (p <- partitionsToBeReassigned) {