You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/06 21:54:57 UTC

git commit: Standardize Zk data structures for Re-assign partitions and Preferred replication election; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-779

Updated Branches:
  refs/heads/0.8 2457bc49e -> eae1bd52e


Standardize Zk data structures for Re-assign partitions and Preferred replication election; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-779


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eae1bd52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eae1bd52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eae1bd52

Branch: refs/heads/0.8
Commit: eae1bd52ef1d991834151898e768e57de8bb07bf
Parents: 2457bc4
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Mar 6 12:54:39 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 6 12:54:39 2013 -0800

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala    |   32 +++++---
 .../kafka/admin/ReassignPartitionsCommand.scala    |   23 ++----
 .../src/main/scala/kafka/api/ProducerRequest.scala |    1 -
 .../consumer/ZookeeperConsumerConnector.scala      |    4 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |    4 +-
 core/src/main/scala/kafka/utils/Utils.scala        |   71 ++++++---------
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   51 +++++++----
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |    3 +-
 8 files changed, 94 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index faf0e3e..e59d5af 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -21,6 +21,8 @@ import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+import collection._
+import mutable.ListBuffer
 
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
@@ -28,7 +30,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     val parser = new OptionParser
     val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
       "for which preferred replica leader election should be done, in the following format - \n" +
-       "[{\"topic\": \"foo\", \"partition\": \"1\"}, {\"topic\": \"foobar\", \"partition\": \"2\"}]. \n" +
+       "{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
       "Defaults to all existing partitions")
       .withRequiredArg
       .describedAs("list of partitions for which preferred replica leader election needs to be triggered")
@@ -67,14 +69,18 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
+  def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
     Json.parseFull(jsonString) match {
-      case Some(partitionList) =>
-        val partitions = (partitionList.asInstanceOf[List[Any]])
-        Set.empty[TopicAndPartition] ++ partitions.map { m =>
-          val topic = m.asInstanceOf[Map[String, String]].get("topic").get
-          val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
-          TopicAndPartition(topic, partition)
+      case Some(m) =>
+        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+          case Some(partitionsList) =>
+            val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
+            partitions.map { p =>
+              val topic = p.get("topic").get.asInstanceOf[String]
+              val partition = p.get("partition").get.asInstanceOf[Int]
+              TopicAndPartition(topic, partition)
+            }.toSet
+          case None => throw new AdministrationException("Preferred replica election data is empty")
         }
       case None => throw new AdministrationException("Preferred replica election data is empty")
     }
@@ -83,9 +89,13 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
   def writePreferredReplicaElectionData(zkClient: ZkClient,
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
     val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-    val jsonData = Utils.seqToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
-      Utils.mapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)), valueInQuotes = true)
-    }.toSeq.sorted, valueInQuotes = false)
+    var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
+    for (p <- partitionsUndergoingPreferredReplicaElection) {
+      partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++
+                                               Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false))
+    }
+    val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false)
+    val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false)
     try {
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
       info("Created preferred replica election path with %s".format(jsonData))

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index d3845cc..8d287f4 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -28,7 +28,7 @@ object ReassignPartitionsCommand extends Logging {
     val parser = new OptionParser
     val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
       "new replicas they should be reassigned to in the following format - \n" +
-       "[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]")
+       "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }]\n}")
       .withRequiredArg
       .describedAs("partition reassignment json file path")
       .ofType(classOf[String])
@@ -55,18 +55,9 @@ object ReassignPartitionsCommand extends Logging {
 
     try {
       // read the json file into a string
-      val partitionsToBeReassigned = Json.parseFull(jsonString) match {
-        case Some(reassignedPartitions) =>
-          val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
-          partitions.map { m =>
-            val topic = m.asInstanceOf[Map[String, String]].get("topic").get
-            val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
-            val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
-            val newReplicas = replicasList.split(",").map(_.toInt)
-            (TopicAndPartition(topic, partition), newReplicas.toSeq)
-          }.toMap
-        case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
-      }
+      val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+      if (partitionsToBeReassigned.isEmpty)
+        throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
 
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
       val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
@@ -86,15 +77,15 @@ object ReassignPartitionsCommand extends Logging {
   }
 }
 
-class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[TopicAndPartition, Seq[Int]])
+class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
   extends Logging {
   def reassignPartitions(): Boolean = {
     try {
       val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
-      val jsonReassignmentData = Utils.mapWithSeqValuesToJson(validPartitions.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
+      val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
       ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
       true
-    }catch {
+    } catch {
       case ze: ZkNodeExistsException =>
         val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
         throw new AdminCommandFailedException("Partition reassignment currently in " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 1e05d7e..fda3e39 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -19,7 +19,6 @@ package kafka.api
 
 import java.nio._
 import kafka.message._
-import scala.collection.Map
 import kafka.api.ApiUtils._
 import kafka.common._
 import kafka.network.RequestChannel.Response

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c9e4127..b266f3f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,8 +213,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
     val consumerRegistrationInfo =
-      Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false),
-                                 Utils.mapToJson(Map("pattern" -> topicCount.pattern), valueInQuotes = true)))
+      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
+                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
     createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
     info("end registering consumer " + consumerIdString + " in ZK")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index f3a5095..7f0d1ce 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -35,7 +35,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index fe4c925..c639efb 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -465,27 +465,42 @@ object Utils extends Logging {
   def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
 
   /**
-   * Format a Map[String, String] as JSON object.
+   * Merge JSON fields of the format "key" : value/object/array.
    */
-  def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
+  def mergeJsonFields(objects: Seq[String]): String = {
     val builder = new StringBuilder
     builder.append("{ ")
-    var numElements = 0
-    for ( (key, value) <- jsonDataMap.toList.sorted) {
-      if (numElements > 0)
-        builder.append(", ")
+    builder.append(objects.sorted.map(_.trim).mkString(", "))
+    builder.append(" }")
+    builder.toString
+  }
+
+ /**
+   * Format a Map[String, String] as JSON object.
+   */
+  def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
+    val jsonFields: mutable.ListBuffer[String] = ListBuffer()
+    val builder = new StringBuilder
+    for ((key, value) <- jsonDataMap.toList.sorted) {
       builder.append("\"" + key + "\":")
       if (valueInQuotes)
         builder.append("\"" + value + "\"")
       else
         builder.append(value)
-      numElements += 1
+      jsonFields += builder.toString
+      builder.clear()
     }
-    builder.append(" }")
-    builder.toString
+    jsonFields
   }
 
   /**
+   * Format a Map[String, String] as JSON object.
+   */
+  def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
+    mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes))
+  }
+
+   /**
    * Format a Seq[String] as JSON array.
    */
   def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = {
@@ -504,45 +519,11 @@ object Utils extends Logging {
    */
 
   def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = {
-    val builder = new StringBuilder
-    builder.append("{ ")
-    var numElements = 0
-    for ((key, value) <- jsonDataMap.toList.sortBy(_._1)) {
-      if (numElements > 0)
-        builder.append(", ")
-      builder.append("\"" + key + "\": ")
-      builder.append(Utils.seqToJson(value.map(_.toString), valueInQuotes = false))
-      numElements += 1
-    }
-    builder.append(" }")
-    builder.toString
-  }
-
-
-  /**
-   * Merge arbitrary JSON objects.
-   */
-  def mergeJsonObjects(objects: Seq[String]): String = {
-    val builder = new StringBuilder
-    builder.append("{ ")
-    var obs = List[String]()
-    objects.foreach(ob => obs = obs ::: getJsonContents(ob).split(',').toList)
-    obs = obs.sorted.map(_.trim)
-    builder.append(obs.mkString(", "))
-    builder.append(" }")
-    builder.toString
+    mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))),
+                                    valueInQuotes = false))
   }
 
   /**
-   * Get the contents of a JSON object or array.
-   */
-  def getJsonContents(str: String): String = {
-    str.trim().substring(1, str.length - 1)
-  }
-
-
-
-  /**
    * Create a circular (looping) iterator over a collection.
    * @param coll An iterable over the underlying collection.
    * @return A circular iterator over the collection.

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f0aba12..9a0e250 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -24,6 +24,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import collection._
 import kafka.api.LeaderAndIsr
+import mutable.ListBuffer
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
 import kafka.admin._
@@ -183,8 +184,9 @@ object ZkUtils extends Logging {
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val brokerInfo =
-      Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("host" -> host), valueInQuotes = true),
-                                 Utils.mapToJson(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), valueInQuotes = false)))
+      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
+                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString),
+                                                   valueInQuotes = false))
     try {
       createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
     } catch {
@@ -209,7 +211,7 @@ object ZkUtils extends Logging {
    * Get JSON partition to replica map from zookeeper.
    */
   def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = {
-    val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map.map(e => (e._1.toString -> e._2)))
+    val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map)
     Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false)
   }
 
@@ -559,26 +561,41 @@ object ZkUtils extends Logging {
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
         val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
-        reassignedPartitions.map { p =>
-          val newReplicas = p._2
-          (p._1 -> new ReassignedPartitionsContext(newReplicas))
-        }
+        reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2)))
       case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
     }
   }
 
-  def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = {
+  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
+    val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map()
     Json.parseFull(jsonData) match {
       case Some(m) =>
-        val replicaMap = m.asInstanceOf[Map[String, Seq[Int]]]
-        replicaMap.map { reassignedPartitions =>
-          val topic = reassignedPartitions._1.split(",").head.trim
-          val partition = reassignedPartitions._1.split(",").last.trim.toInt
-          val newReplicas = reassignedPartitions._2
-          TopicAndPartition(topic, partition) -> newReplicas
+        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+          case Some(partitionsSeq) =>
+            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => {
+              val topic = p.get("topic").get.asInstanceOf[String]
+              val partition = p.get("partition").get.asInstanceOf[Int]
+              val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
+              reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas
+            })
+          case None =>
         }
-      case None => Map.empty[TopicAndPartition, Seq[Int]]
+      case None =>
+    }
+    reassignedPartitions
+  }
+
+  def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
+    var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
+    for (p <- partitionsToBeReassigned) {
+      val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false)
+      val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true)
+      val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData),
+                                                    valueInQuotes = false)
+      jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData)
     }
+    Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)),
+                    valueInQuotes = false)
   }
 
   def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
@@ -588,11 +605,11 @@ object ZkUtils extends Logging {
         deletePath(zkClient, zkPath)
         info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
       case _ =>
-        val jsonData = Utils.mapWithSeqValuesToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
+        val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
         try {
           updatePersistentPath(zkClient, zkPath, jsonData)
           info("Updated partition reassignment path with %s".format(jsonData))
-        }catch {
+        } catch {
           case nne: ZkNoNodeException =>
             ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))

http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 0b6244f..6b21554 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -23,8 +23,7 @@ import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
 import kafka.common.KafkaException
-import org.junit.{Test}
-import kafka.tools.KafkaMigrationTool
+import org.junit.Test
 
 
 class UtilsTest extends JUnitSuite {