You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/06 13:32:51 UTC

kafka git commit: KAFKA-4266; ReassignPartitionsClusterTest: ensure ZK publication is completed before start

Repository: kafka
Updated Branches:
  refs/heads/trunk 79f85039d -> 63010cbfe


KAFKA-4266; ReassignPartitionsClusterTest: ensure ZK publication is completed before start

Increase the reliability of the one temporal comparison in ReassignPartitionsClusterTest by imposing a delay after ZK is updated. This should be more reliable than just increasing the amount of data.

This relates to a previous PR: https://github.com/apache/kafka/pull/1982

Author: Ben Stopford <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1997 from benstopford/KAFKA-4266


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63010cbf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63010cbf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63010cbf

Branch: refs/heads/trunk
Commit: 63010cbfe5d07df5db060888afd348fab5cbe62c
Parents: 79f8503
Author: Ben Stopford <be...@gmail.com>
Authored: Mon Mar 6 12:42:43 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Mar 6 13:12:28 2017 +0000

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala | 37 ++++++++-----
 .../other/kafka/ReplicationQuotasTestRig.scala  |  3 +-
 .../admin/ReassignPartitionsClusterTest.scala   | 55 +++++++++++---------
 .../admin/ReassignPartitionsCommandTest.scala   |  7 +--
 .../kafka/server/ReplicationQuotasTest.scala    |  1 -
 5 files changed, 60 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4e7b4e0..c167633 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -17,8 +17,9 @@
 package kafka.admin
 
 import joptsimple.OptionParser
-import kafka.server.{DynamicConfig, ConfigType}
+import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils._
+
 import scala.collection._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
@@ -29,6 +30,10 @@ import org.apache.kafka.common.security.JaasUtils
 
 object ReassignPartitionsCommand extends Logging {
 
+  case class Throttle(value: Long, postUpdateAction: () => Unit = () => ())
+
+  private[admin] val NoThrottle = Throttle(-1)
+
   def main(args: Array[String]): Unit = {
 
     val opts = validateAndParseArgs(args)
@@ -146,10 +151,10 @@ object ReassignPartitionsCommand extends Logging {
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
     val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
-    executeAssignment(zkUtils, reassignmentJsonString, throttle)
+    executeAssignment(zkUtils, reassignmentJsonString, Throttle(throttle))
   }
 
-  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
+  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Throttle) {
     val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
 
@@ -160,7 +165,7 @@ object ReassignPartitionsCommand extends Logging {
     }
     else {
       printCurrentAssignment(zkUtils, partitionsToBeReassigned)
-      if (throttle >= 0)
+      if (throttle.value >= 0)
         println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
       if (reassignPartitionsCommand.reassignPartitions(throttle)) {
         println("Successfully started reassignment of partitions.")
@@ -178,7 +183,7 @@ object ReassignPartitionsCommand extends Logging {
 
   def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): Seq[(TopicAndPartition, Seq[Int])] = {
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
-    
+
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
     val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
@@ -306,15 +311,19 @@ object ReassignPartitionsCommand extends Logging {
 class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicAndPartition, Seq[Int]], admin: AdminUtilities = AdminUtils)
   extends Logging {
 
+  import ReassignPartitionsCommand._
+
   def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
     val proposedTopics = proposedAssignment.keySet.map(_.topic).toSeq
     zkUtils.getReplicaAssignmentForTopics(proposedTopics)
   }
 
-  private def maybeThrottle(throttle: Long): Unit = {
-    if (throttle >= 0) {
-      maybeLimit(throttle)
+  private def maybeThrottle(throttle: Throttle): Unit = {
+    if (throttle.value >= 0) {
       assignThrottledReplicas(existingAssignment(), proposedAssignment)
+      maybeLimit(throttle)
+      throttle.postUpdateAction()
+      println(s"The throttle limit was set to ${throttle.value} B/s")
     }
   }
 
@@ -322,19 +331,18 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
     * Limit the throttle on currently moving replicas. Note that this command can use used to alter the throttle, but
     * it may not alter all limits originally set, if some of the brokers have completed their rebalance.
     */
-  def maybeLimit(throttle: Long) {
-    if (throttle >= 0) {
+  def maybeLimit(throttle: Throttle) {
+    if (throttle.value >= 0) {
       val existingBrokers = existingAssignment().values.flatten.toSeq
       val proposedBrokers = proposedAssignment.values.flatten.toSeq
       val brokers = (existingBrokers ++ proposedBrokers).distinct
 
       for (id <- brokers) {
         val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
-        configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString)
-        configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
+        configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.value.toString)
+        configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.value.toString)
         admin.changeBrokerConfig(zkUtils, Seq(id), configs)
       }
-      println(s"The throttle limit was set to $throttle B/s")
     }
   }
 
@@ -384,7 +392,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
       allProposed.filter { case (tp, _) => tp.topic == topic })
   }
 
-  def reassignPartitions(throttle: Long = -1): Boolean = {
+  def reassignPartitions(throttle: Throttle = NoThrottle): Boolean = {
     maybeThrottle(throttle)
     try {
       val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
@@ -422,6 +430,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
   }
 }
 
+
 sealed trait ReassignmentStatus { def status: Int }
 case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
 case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 71a5091..38d07ba 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, PrintWriter}
 import javax.imageio.ImageIO
 
 import kafka.admin.ReassignPartitionsCommand
+import kafka.admin.ReassignPartitionsCommand.Throttle
 import kafka.common.TopicAndPartition
 import org.apache.kafka.common.TopicPartition
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
@@ -139,7 +140,7 @@ object ReplicationQuotasTestRig {
       val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
 
       val start = System.currentTimeMillis()
-      ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), config.throttle)
+      ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
 
       //Await completion
       waitForReassignmentToComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index c211c24..c576a5c 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,6 +12,7 @@
   */
 package kafka.admin
 
+import kafka.admin.ReassignPartitionsCommand._
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
@@ -21,14 +22,15 @@ import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
-
-import scala.collection.{Map, Seq}
-
+import scala.collection.Map
+import scala.collection.Seq
 
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   val partitionId = 0
   var servers: Seq[KafkaServer] = null
   val topicName = "my-topic"
+  val delayMs = 1000
+  def zkUpdateDelay = {Thread.sleep(delayMs)}
 
   @Before
   override def setUp() {
@@ -55,7 +57,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
 
     //When we move the replica on 100 to broker 101
-    ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replica should be on 101
@@ -74,8 +77,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     ), servers = servers)
 
     //When rebalancing
-    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replicas should span all three brokers
@@ -95,8 +98,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     ), servers = servers)
 
     //When rebalancing
-    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    val newAssignment = generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then replicas should only span the first two brokers
@@ -127,7 +130,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     )
 
     //When rebalancing
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed))
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed), NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the proposed changes should have been made
@@ -142,6 +145,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldExecuteThrottledReassignment() {
+
     //Given partitions on 3 of 3 brokers
     val brokers = Array(100, 101, 102)
     startBrokers(brokers)
@@ -150,33 +154,35 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     ), servers = servers)
 
     //Given throttle set so replication will take a certain number of secs
-    val initialThrottle: Long = 10 * 1000 * 1000
+    val initialThrottle = Throttle(10 * 1000 * 1000, () => zkUpdateDelay)
     val expectedDurationSecs = 5
     val numMessages: Int = 500
     val msgSize: Int = 100 * 1000
     produceMessages(servers, topicName, numMessages, acks = 0, msgSize)
-    assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle)
+    assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle.value)
 
     //Start rebalance which will move replica on 100 -> replica on 102
-    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+    val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
 
     val start = System.currentTimeMillis()
     ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
 
     //Check throttle config. Should be throttling replica 0 on 100 and 102 only.
-    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
+    checkThrottleConfigAddedToZK(initialThrottle.value, servers, topicName, "0:100,0:101", "0:102")
 
     //Await completion
     waitForReassignmentToComplete()
-    val took = System.currentTimeMillis() - start
+    val took = System.currentTimeMillis() - start - delayMs
 
     //Check move occurred
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
     assertEquals(Seq(101, 102), actual.values.flatten.toSeq.distinct.sorted)
 
     //Then command should have taken longer than the throttle rate
-    assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took", took > expectedDurationSecs * 0.9 * 1000)
-    assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took", took < expectedDurationSecs * 2 * 1000)
+    assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took",
+      took > expectedDurationSecs * 0.9 * 1000)
+    assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took",
+      took < expectedDurationSecs * 2 * 1000)
   }
 
 
@@ -211,7 +217,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
       TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
       TopicAndPartition("topic2", 2) -> Seq(103, 104)  //didn't move
     )
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), throttle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(throttle))
 
     //Check throttle config. Should be throttling specific replicas for each topic.
     checkThrottleConfigAddedToZK(throttle, servers, "topic1",
@@ -238,15 +244,15 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
 
     //Start rebalance
-    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+    val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(initialThrottle))
 
     //Check throttle config
     checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
 
     //Ensure that running Verify, whilst the command is executing, should have no effect
-    ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
 
     //Check throttle config again
     checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -254,7 +260,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //Now re-run the same assignment with a larger throttle, which should only act to increase the throttle and make progress
     val newThrottle = initialThrottle * 1000
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), newThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(newThrottle))
 
     //Check throttle was changed
     checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:101", "0:102")
@@ -263,7 +269,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     waitForReassignmentToComplete()
 
     //Verify should remove the throttle
-    ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+    verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
 
     //Check removed
     checkThrottleConfigRemovedFromZK(topicName, servers)
@@ -280,12 +286,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
 
     //When we execute an assignment that includes an invalid partition (1:101 in this case)
-    ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}""")
+    val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
   }
 
   @Test
   def shouldPerformThrottledReassignmentOverVariousTopics() {
-    val throttle = 1000L
+    val throttle = Throttle(1000L)
 
     //Given four brokers
     servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 5ecc19b..9e23983 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -18,6 +18,7 @@ package kafka.admin
 
 import java.util.Properties
 
+import kafka.admin.ReassignPartitionsCommand.Throttle
 import kafka.common.TopicAndPartition
 import kafka.log.LogConfig
 import kafka.log.LogConfig._
@@ -248,7 +249,7 @@ class ReassignPartitionsCommandTest extends Logging {
     replay(admin)
 
     //When
-    assigner.maybeLimit(1000)
+    assigner.maybeLimit(Throttle(1000))
 
     //Then
     for (actual <- propsCapture.getValues) {
@@ -282,7 +283,7 @@ class ReassignPartitionsCommandTest extends Logging {
     replay(admin)
 
     //When
-    assigner.maybeLimit(1000)
+    assigner.maybeLimit(Throttle(1000))
 
     //Then
     for (actual <- propsCapture.getValues) {
@@ -312,7 +313,7 @@ class ReassignPartitionsCommandTest extends Logging {
     replay(admin)
 
     //When
-    assigner.maybeLimit(1000)
+    assigner.maybeLimit(Throttle(1000))
 
     //Then other property remains
     for (actual <- propsCapture.getValues) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/63010cbf/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 984d340..b6e3607 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
-
 import scala.collection.JavaConverters._
 
 /**