You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/26 01:38:44 UTC

kafka git commit: MINOR: Move a few methods from the `ZKUtils` class to the companion object

Repository: kafka
Updated Branches:
  refs/heads/trunk 53651937f -> 32feed25a


MINOR: Move a few methods from the `ZKUtils` class to the companion object

They don't require access to `ZkClient`.

Also include a few obvious clean-ups in `ZKUtils`:
* Remove redundant rethrows and braces
* Use named arguments for booleans

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Gwen Shapira <cs...@gmail.com>

Closes #1775 from ijuma/move-some-zk-utils-methods-to-companion-object


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32feed25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32feed25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32feed25

Branch: refs/heads/trunk
Commit: 32feed25aefdda3a9f2b780d0709a3002777c9df
Parents: 5365193
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Aug 26 00:41:23 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Aug 26 00:41:23 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  16 +--
 .../kafka/controller/KafkaController.scala      |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 132 +++++++++----------
 .../admin/ReassignPartitionsClusterTest.scala   |   9 +-
 4 files changed, 77 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 18f741e..0b32d93 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -62,7 +62,7 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
-    val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentData(jsonString)
+    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
 
     println("Status of partition reassignment:")
     val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
@@ -89,12 +89,12 @@ object ReassignPartitionsCommand extends Logging {
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val disableRackAware = opts.options.has(opts.disableRackAware)
     val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
-    println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments)))
-    println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(proposedAssignments)))
+    println("Current partition replica assignment\n\n%s".format(ZkUtils.formatAsReassignmentJson(currentAssignments)))
+    println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.formatAsReassignmentJson(proposedAssignments)))
   }
 
   def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
-    val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
+    val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
@@ -126,7 +126,7 @@ object ReassignPartitionsCommand extends Logging {
 
   def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String) {
 
-    val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
     val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
@@ -145,10 +145,10 @@ object ReassignPartitionsCommand extends Logging {
     // before starting assignment, output the current replica assignment to facilitate rollback
     val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
-      .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
+      .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
     // start the reassignment
     if (reassignPartitionsCommand.reassignPartitions())
-      println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
+      println("Successfully started reassignment of partitions %s".format(ZkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
     else
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
   }
@@ -226,7 +226,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top
       val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition))
       if (validPartitions.isEmpty) false
       else {
-        val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions)
+        val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
         zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
         true
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0d6f048..04bd3f4 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1259,7 +1259,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
   def handleDataChange(dataPath: String, data: Object) {
     debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
       .format(dataPath, data))
-    val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
+    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
     val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
       partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 3788ef4..a137da8 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -120,6 +120,61 @@ object ZkUtils {
 
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
+
+  // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
+  def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
+    Json.parseFull(jsonData) match {
+      case Some(m) =>
+        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+          case Some(partitionsSeq) =>
+            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
+              val topic = p.get("topic").get.asInstanceOf[String]
+              val partition = p.get("partition").get.asInstanceOf[Int]
+              val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
+              TopicAndPartition(topic, partition) -> newReplicas
+            })
+          case None =>
+            Seq.empty
+        }
+      case None =>
+        Seq.empty
+    }
+  }
+
+  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] =
+    parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
+
+  def parseTopicsData(jsonData: String): Seq[String] = {
+    var topics = List.empty[String]
+    Json.parseFull(jsonData) match {
+      case Some(m) =>
+        m.asInstanceOf[Map[String, Any]].get("topics") match {
+          case Some(partitionsSeq) =>
+            val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
+            mapPartitionSeq.foreach(p => {
+              val topic = p.get("topic").get.asInstanceOf[String]
+              topics ++= List(topic)
+            })
+          case None =>
+        }
+      case None =>
+    }
+    topics
+  }
+
+  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
+    Json.encode(Map(
+      "version" -> 1,
+      "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
+        Map(
+          "topic" -> topic,
+          "partition" -> partition,
+          "replicas" -> replicas
+        )
+      }
+    ))
+  }
+
 }
 
 class ZkUtils(val zkClient: ZkClient,
@@ -337,7 +392,7 @@ class ZkUtils(val zkClient: ZkClient,
     } else acls
 
     if (!zkClient.exists(path))
-      ZkPath.createPersistent(zkClient, path, true, acl) //won't throw NoNodeException or NodeExistsException
+      ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
   }
 
   /**
@@ -346,7 +401,7 @@ class ZkUtils(val zkClient: ZkClient,
   private def createParentPath(path: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
     val parentDir = path.substring(0, path.lastIndexOf('/'))
     if (parentDir.length != 0) {
-      ZkPath.createPersistent(zkClient, parentDir, true, acls)
+      ZkPath.createPersistent(zkClient, parentDir, createParents = true, acls)
     }
   }
 
@@ -357,10 +412,9 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       ZkPath.createEphemeral(zkClient, path, data, acls)
     } catch {
-      case e: ZkNoNodeException => {
+      case e: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createEphemeral(zkClient, path, data, acls)
-      }
     }
   }
 
@@ -372,14 +426,13 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       createEphemeralPath(path, data, acls)
     } catch {
-      case e: ZkNodeExistsException => {
+      case e: ZkNodeExistsException =>
         // this can happen when there is connection loss; make sure the data is what we intend to write
         var storedData: String = null
         try {
           storedData = readData(path)._1
         } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
-          case e2: Throwable => throw e2
         }
         if (storedData == null || storedData != data) {
           info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -388,8 +441,6 @@ class ZkUtils(val zkClient: ZkClient,
           // otherwise, the creation succeeded, return normally
           info(path + " exists with value " + data + " during connection loss; this is ok")
         }
-      }
-      case e2: Throwable => throw e2
     }
   }
 
@@ -400,10 +451,9 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       ZkPath.createPersistent(zkClient, path, data, acls)
     } catch {
-      case e: ZkNoNodeException => {
+      case e: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createPersistent(zkClient, path, data, acls)
-      }
     }
   }
 
@@ -420,17 +470,14 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.writeData(path, data)
     } catch {
-      case e: ZkNoNodeException => {
+      case e: ZkNoNodeException =>
         createParentPath(path)
         try {
           ZkPath.createPersistent(zkClient, path, data, acls)
         } catch {
           case e: ZkNodeExistsException =>
             zkClient.writeData(path, data)
-          case e2: Throwable => throw e2
         }
-      }
-      case e2: Throwable => throw e2
     }
   }
 
@@ -493,11 +540,9 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.writeData(path, data)
     } catch {
-      case e: ZkNoNodeException => {
+      case e: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createEphemeral(zkClient, path, data, acls)
-      }
-      case e2: Throwable => throw e2
     }
   }
 
@@ -509,7 +554,6 @@ class ZkUtils(val zkClient: ZkClient,
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
         false
-      case e2: Throwable => throw e2
     }
   }
 
@@ -533,7 +577,6 @@ class ZkUtils(val zkClient: ZkClient,
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
-      case e2: Throwable => throw e2
     }
   }
 
@@ -544,13 +587,12 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def readDataMaybeNull(path: String): (Option[String], Stat) = {
-    val stat: Stat = new Stat()
+    val stat = new Stat()
     val dataAndStat = try {
                         (Some(zkClient.readData(path, stat)), stat)
                       } catch {
                         case e: ZkNoNodeException =>
                           (None, stat)
-                        case e2: Throwable => throw e2
                       }
     dataAndStat
   }
@@ -568,7 +610,6 @@ class ZkUtils(val zkClient: ZkClient,
       zkClient.getChildren(path)
     } catch {
       case e: ZkNoNodeException => Nil
-      case e2: Throwable => throw e2
     }
   }
 
@@ -668,53 +709,6 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
-  // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
-  def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("partitions") match {
-          case Some(partitionsSeq) =>
-            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
-              val topic = p.get("topic").get.asInstanceOf[String]
-              val partition = p.get("partition").get.asInstanceOf[Int]
-              val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
-              TopicAndPartition(topic, partition) -> newReplicas
-            })
-          case None =>
-            Seq.empty
-        }
-      case None =>
-        Seq.empty
-    }
-  }
-
-  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
-    parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
-  }
-
-  def parseTopicsData(jsonData: String): Seq[String] = {
-    var topics = List.empty[String]
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("topics") match {
-          case Some(partitionsSeq) =>
-            val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
-            mapPartitionSeq.foreach(p => {
-              val topic = p.get("topic").get.asInstanceOf[String]
-              topics ++= List(topic)
-            })
-          case None =>
-        }
-      case None =>
-    }
-    topics
-  }
-
-  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
-    Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
-                                                                                          "replicas" -> e._2))))
-  }
-
   def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
     val zkPath = ReassignPartitionsPath
     partitionsToBeReassigned.size match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index ac2c1ae..791c4d2 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -16,10 +16,11 @@ import kafka.admin.ReassignPartitionsCommand
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.{After, Before, Test}
 import org.junit.Assert.assertEquals
+
 import scala.collection.Seq
 
 
@@ -73,7 +74,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When rebalancing
     val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
     waitForReasignmentToComplete()
 
     //Then the replicas should span all three brokers
@@ -94,7 +95,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When rebalancing
     val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
     waitForReasignmentToComplete()
 
     //Then replicas should only span the first two brokers
@@ -103,7 +104,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   }
 
   def waitForReasignmentToComplete() {
-    waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted")
+    waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted")
   }
 
   def json(topic: String): String = {