You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/03 20:17:00 UTC

[kafka] branch trunk updated: KAFKA-8383; Integration tests for unclean `electLeaders` (#6857)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45fae33  KAFKA-8383; Integration tests for unclean `electLeaders` (#6857)
45fae33 is described below

commit 45fae339373fe4bd90fb320695267b8340094fa3
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Mon Jun 3 13:16:38 2019 -0700

    KAFKA-8383; Integration tests for unclean `electLeaders` (#6857)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/api/AdminClientIntegrationTest.scala     | 346 +++++++++++++++++----
 .../integration/UncleanLeaderElectionTest.scala    |   2 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   1 -
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   1 -
 4 files changed, 294 insertions(+), 56 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 66689e4..4145137 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -16,22 +16,22 @@
  */
 package kafka.api
 
-import java.{time, util}
-import java.util.{Collections, Properties}
-import java.util.Arrays.asList
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.io.File
+import java.lang.{Long => JLong}
+import java.time.{Duration => JDuration}
+import java.util.Arrays.asList
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-
-import org.apache.kafka.clients.admin.KafkaAdminClientTest
-import org.apache.kafka.common.utils.{Time, Utils}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.{Collections, Properties}
+import java.{time, util}
 import kafka.log.LogConfig
+import kafka.security.auth.{Cluster, Group, Topic}
 import kafka.server.{Defaults, KafkaConfig, KafkaServer}
-import org.apache.kafka.clients.admin._
-import kafka.utils.{Logging, TestUtils}
-import kafka.utils.TestUtils._
 import kafka.utils.Implicits._
-import org.apache.kafka.clients.admin.NewTopic
+import kafka.utils.TestUtils._
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -42,23 +42,17 @@ import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
-import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
-import org.junit.rules.Timeout
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.junit.Assert._
+import org.junit.rules.Timeout
+import org.junit.{After, Before, Rule, Test}
 import org.scalatest.Assertions.intercept
-
-import scala.util.Random
 import scala.collection.JavaConverters._
-import kafka.zk.KafkaZkClient
-
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
-import java.lang.{Long => JLong}
-import java.time.{Duration => JDuration}
-
-import kafka.security.auth.{Cluster, Group, Topic}
+import scala.util.Random
 
 /**
  * An integration test of the KafkaAdminClient.
@@ -1271,22 +1265,15 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
     TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers)
 
-    def currentLeader(topicPartition: TopicPartition) =
-      client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
-        get.partitions.get(topicPartition.partition).leader.id
-
     def preferredLeader(topicPartition: TopicPartition) =
       client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
         get.partitions.get(topicPartition.partition).replicas.get(0).id
 
-    def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) =
-      TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, s"Expected leader to become $leader", 10000)
-
     /** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */
     def changePreferredLeader(newAssignment: Seq[Int]) = {
       val preferred = newAssignment.head
-      val prior1 = currentLeader(partition1)
-      val prior2 = currentLeader(partition2)
+      val prior1 = currentLeader(client, partition1).get
+      val prior2 = currentLeader(client, partition2).get
 
       var m = Map.empty[TopicPartition, Seq[Int]]
 
@@ -1301,26 +1288,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
         s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
         10000)
       // Check the leader hasn't moved
-      assertEquals(prior1, currentLeader(partition1))
-      assertEquals(prior2, currentLeader(partition2))
+      assertEquals(Some(prior1), currentLeader(client, partition1))
+      assertEquals(Some(prior2), currentLeader(client, partition2))
     }
 
     // Check current leaders are 0
-    assertEquals(0, currentLeader(partition1))
-    assertEquals(0, currentLeader(partition2))
+    assertEquals(Some(0), currentLeader(client, partition1))
+    assertEquals(Some(0), currentLeader(client, partition2))
 
     // Noop election
     var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
     var exception = electResult.partitions.get.get(partition1).get
     assertEquals(classOf[ElectionNotNeededException], exception.getClass)
     assertEquals("Leader election not needed for topic partition", exception.getMessage)
-    assertEquals(0, currentLeader(partition1))
+    assertEquals(Some(0), currentLeader(client, partition1))
 
     // Noop election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertTrue(electResult.partitions.get.isEmpty)
-    assertEquals(0, currentLeader(partition1))
-    assertEquals(0, currentLeader(partition2))
+    assertEquals(Some(0), currentLeader(client, partition1))
+    assertEquals(Some(0), currentLeader(client, partition2))
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
@@ -1329,17 +1316,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
     assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition1).isPresent)
-    waitForLeaderToBecome(partition1, 1)
+    waitForLeaderToBecome(client, partition1, Some(1))
 
     // topic 2 unchanged
     assertFalse(electResult.partitions.get.containsKey(partition2))
-    assertEquals(0, currentLeader(partition2))
+    assertEquals(Some(0), currentLeader(client, partition2))
 
     // meaningful election with null partitions
     electResult = client.electLeaders(ElectionType.PREFERRED, null)
     assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    waitForLeaderToBecome(partition2, 1)
+    waitForLeaderToBecome(client, partition2, Some(1))
 
     // unknown topic
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
@@ -1348,8 +1335,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     exception = electResult.partitions.get.get(unknownPartition).get
     assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
     assertEquals("The partition does not exist.", exception.getMessage)
-    assertEquals(1, currentLeader(partition1))
-    assertEquals(1, currentLeader(partition2))
+    assertEquals(Some(1), currentLeader(client, partition1))
+    assertEquals(Some(1), currentLeader(client, partition2))
 
     // Now change the preferred leader to 2
     changePreferredLeader(prefer2)
@@ -1357,8 +1344,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     // mixed results
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition, partition1).asJava)
     assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
-    waitForLeaderToBecome(partition1, 2)
-    assertEquals(1, currentLeader(partition2))
+    waitForLeaderToBecome(client, partition1, Some(2))
+    assertEquals(Some(1), currentLeader(client, partition2))
     exception = electResult.partitions.get.get(unknownPartition).get
     assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
     assertEquals("The partition does not exist.", exception.getMessage)
@@ -1367,17 +1354,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
     assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
-    waitForLeaderToBecome(partition2, 2)
+    waitForLeaderToBecome(client, partition2, Some(2))
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
     // but shut it down...
     servers(1).shutdown()
-    waitUntilTrue (() => {
-      val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all.get
-      val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
-      !isr.exists(_.id == 1)
-    }, "Expect broker 1 to no longer be in any ISR")
+    waitForBrokerOutOfIsr(client, Set(partition1, partition2), 1)
 
     // ... now what happens if we try to elect the preferred leader and it's down?
     val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
@@ -1387,7 +1370,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
     assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
-    assertEquals(2, currentLeader(partition1))
+    assertEquals(Some(2), currentLeader(client, partition1))
 
     // preferred leader unavailable with null argument
     electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
@@ -1402,8 +1385,235 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
 
-    assertEquals(2, currentLeader(partition1))
-    assertEquals(2, currentLeader(partition2))
+    assertEquals(Some(2), currentLeader(client, partition1))
+    assertEquals(Some(2), currentLeader(client, partition2))
+  }
+
+  @Test
+  def testElectUncleanLeadersForOnePartition(): Unit = {
+    // Case: unclean leader election with one topic partition
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val assignment1 = Seq(broker1, broker2)
+
+    val partition1 = new TopicPartition("unclean-test-topic-1", 0)
+    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1), servers)
+
+    waitForLeaderToBecome(client, partition1, Option(broker1))
+
+    servers(broker2).shutdown()
+    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    servers(broker1).shutdown()
+    waitForLeaderToBecome(client, partition1, None)
+    servers(broker2).startup()
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
+    assertFalse(electResult.partitions.get.get(partition1).isPresent)
+    assertEquals(Option(broker2), currentLeader(client, partition1))
+  }
+
+  @Test
+  def testElectUncleanLeadersForManyPartitions(): Unit = {
+    // Case: unclean leader election with many topic partitions
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val assignment1 = Seq(broker1, broker2)
+    val assignment2 = Seq(broker1, broker2)
+
+    val topic = "unclean-test-topic-1"
+    val partition1 = new TopicPartition(topic, 0)
+    val partition2 = new TopicPartition(topic, 1)
+
+    TestUtils.createTopic(
+      zkClient,
+      topic,
+      Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
+      servers
+    )
+
+    waitForLeaderToBecome(client, partition1, Option(broker1))
+    waitForLeaderToBecome(client, partition2, Option(broker1))
+
+    servers(broker2).shutdown()
+    waitForBrokerOutOfIsr(client, Set(partition1, partition2), broker2)
+    servers(broker1).shutdown()
+    waitForLeaderToBecome(client, partition1, None)
+    waitForLeaderToBecome(client, partition2, None)
+    servers(broker2).startup()
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
+    assertFalse(electResult.partitions.get.get(partition1).isPresent)
+    assertFalse(electResult.partitions.get.get(partition2).isPresent)
+    assertEquals(Option(broker2), currentLeader(client, partition1))
+    assertEquals(Option(broker2), currentLeader(client, partition2))
+  }
+
+  @Test
+  def testElectUncleanLeadersForAllPartitions(): Unit = {
+    // Case: noop unclean leader election and valid unclean leader election for all partitions
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val broker3 = 0
+    val assignment1 = Seq(broker1, broker2)
+    val assignment2 = Seq(broker1, broker3)
+
+    val topic = "unclean-test-topic-1"
+    val partition1 = new TopicPartition(topic, 0)
+    val partition2 = new TopicPartition(topic, 1)
+
+    TestUtils.createTopic(
+      zkClient,
+      topic,
+      Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
+      servers
+    )
+
+    waitForLeaderToBecome(client, partition1, Option(broker1))
+    waitForLeaderToBecome(client, partition2, Option(broker1))
+
+    servers(broker2).shutdown()
+    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    servers(broker1).shutdown()
+    waitForLeaderToBecome(client, partition1, None)
+    waitForLeaderToBecome(client, partition2, Some(broker3))
+    servers(broker2).startup()
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
+    assertFalse(electResult.partitions.get.get(partition1).isPresent)
+    assertFalse(electResult.partitions.get.containsKey(partition2))
+    assertEquals(Option(broker2), currentLeader(client, partition1))
+    assertEquals(Option(broker3), currentLeader(client, partition2))
+  }
+
+  @Test
+  def testElectUncleanLeadersForUnknownPartitions(): Unit = {
+    // Case: unclean leader election for unknown topic
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val assignment1 = Seq(broker1, broker2)
+
+    val topic = "unclean-test-topic-1"
+    val unknownPartition = new TopicPartition(topic, 1)
+    val unknownTopic = new TopicPartition("unknown-topic", 0)
+
+    TestUtils.createTopic(
+      zkClient,
+      topic,
+      Map(0 -> assignment1),
+      servers
+    )
+
+    waitForLeaderToBecome(client, new TopicPartition(topic, 0), Option(broker1))
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(unknownPartition, unknownTopic).asJava)
+    assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
+    assertTrue(electResult.partitions.get.get(unknownTopic).get.isInstanceOf[UnknownTopicOrPartitionException])
+  }
+
+  @Test
+  def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
+    // Case: unclean leader election with no live brokers
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val assignment1 = Seq(broker1, broker2)
+
+    val topic = "unclean-test-topic-1"
+    val partition1 = new TopicPartition(topic, 0)
+
+    TestUtils.createTopic(
+      zkClient,
+      topic,
+      Map(partition1.partition -> assignment1),
+      servers
+    )
+
+    waitForLeaderToBecome(client, partition1, Option(broker1))
+
+    servers(broker2).shutdown()
+    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    servers(broker1).shutdown()
+    waitForLeaderToBecome(client, partition1, None)
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
+    assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
+  }
+
+  @Test
+  def testElectUncleanLeadersNoop(): Unit = {
+    // Case: noop unclean leader election with explicit topic partitions
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val assignment1 = Seq(broker1, broker2)
+
+    val topic = "unclean-test-topic-1"
+    val partition1 = new TopicPartition(topic, 0)
+
+    TestUtils.createTopic(
+      zkClient,
+      topic,
+      Map(partition1.partition -> assignment1),
+      servers
+    )
+
+    waitForLeaderToBecome(client, partition1, Option(broker1))
+
+    servers(broker1).shutdown()
+    waitForLeaderToBecome(client, partition1, Some(broker2))
+    servers(broker1).startup()
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
+    assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[ElectionNotNeededException])
+  }
+
+  @Test
+  def testElectUncleanLeadersAndNoop(): Unit = {
+    // Case: one noop unclean leader election and one valid unclean leader election
+    client = AdminClient.create(createConfig)
+
+    val broker1 = 1
+    val broker2 = 2
+    val broker3 = 0
+    val assignment1 = Seq(broker1, broker2)
+    val assignment2 = Seq(broker1, broker3)
+
+    val topic = "unclean-test-topic-1"
+    val partition1 = new TopicPartition(topic, 0)
+    val partition2 = new TopicPartition(topic, 1)
+
+    TestUtils.createTopic(
+      zkClient,
+      topic,
+      Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
+      servers
+    )
+
+    waitForLeaderToBecome(client, partition1, Option(broker1))
+    waitForLeaderToBecome(client, partition2, Option(broker1))
+
+    servers(broker2).shutdown()
+    waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+    servers(broker1).shutdown()
+    waitForLeaderToBecome(client, partition1, None)
+    waitForLeaderToBecome(client, partition2, Some(broker3))
+    servers(broker2).startup()
+
+    val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
+    assertFalse(electResult.partitions.get.get(partition1).isPresent)
+    assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
+    assertEquals(Option(broker2), currentLeader(client, partition1))
+    assertEquals(Option(broker3), currentLeader(client, partition2))
   }
 
   @Test
@@ -1724,4 +1934,34 @@ object AdminClientIntegrationTest {
     assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
   }
 
+  def currentLeader(client: AdminClient, topicPartition: TopicPartition): Option[Int] = {
+    Option(
+      client
+        .describeTopics(asList(topicPartition.topic))
+        .all
+        .get
+        .get(topicPartition.topic)
+        .partitions
+        .get(topicPartition.partition)
+        .leader
+    ).map(_.id)
+  }
+
+  def waitForLeaderToBecome(client: AdminClient, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+    TestUtils.waitUntilTrue(
+      () => currentLeader(client, topicPartition) == leader,
+      s"Expected leader to become $leader", 10000
+    )
+  }
+
+  def waitForBrokerOutOfIsr(client: AdminClient, partitions: Set[TopicPartition], brokerId: Int): Unit = {
+    TestUtils.waitUntilTrue(
+      () => {
+        val description = client.describeTopics(partitions.map(_.topic).asJava).all.get.asScala
+        val isr = description.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+        isr.forall(_.id != brokerId)
+      },
+      s"Expect broker $brokerId to no longer be in any ISR for $partitions"
+    )
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 2974761..c0c8c95 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -18,7 +18,7 @@
 package kafka.integration
 
 import org.apache.kafka.common.config.ConfigException
-import org.junit.{After, Before, Ignore, Test}
+import org.junit.{After, Before, Test}
 
 import scala.util.Random
 import scala.collection.JavaConverters._
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6b69c41..f83562c 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
 
 import kafka.server.LogDirFailureTest._
 import kafka.api.IntegrationTestHarness
-import kafka.cluster.Partition
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
 import kafka.utils.{CoreUtils, Exit, TestUtils}
 import org.apache.kafka.clients.consumer.KafkaConsumer
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 780d189..6ab3138 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,7 +23,6 @@ import TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import java.io.File
 
-import kafka.cluster.Partition
 import kafka.server.checkpoints.OffsetCheckpointFile
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition