You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/25 23:31:37 UTC

git commit: kafka-1395; fix unit tests in AutoOffsetResetTest; patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk c9bb24f15 -> 8a0314d01


kafka-1395; fix unit tests in AutoOffsetResetTest; patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8a0314d0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8a0314d0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8a0314d0

Branch: refs/heads/trunk
Commit: 8a0314d01786541f06d0eb41b2e81c8bb1d45a8d
Parents: c9bb24f
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Apr 25 14:31:33 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Apr 25 14:31:33 2014 -0700

----------------------------------------------------------------------
 .../kafka/integration/AutoOffsetResetTest.scala | 61 ++++----------
 .../unit/kafka/integration/FetcherTest.scala    |  6 +-
 .../ProducerConsumerTestHarness.scala           | 12 ++-
 .../integration/UncleanLeaderElectionTest.scala |  7 +-
 .../ZookeeperConsumerConnectorTest.scala        | 25 +++---
 .../unit/kafka/server/LogRecoveryTest.scala     | 86 +++++++-------------
 .../unit/kafka/server/ReplicaFetchTest.scala    |  6 +-
 .../unit/kafka/server/ServerShutdownTest.scala  | 28 ++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala | 38 +++++----
 9 files changed, 116 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 7125ec9..95303e0 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -17,22 +17,25 @@
 
 package kafka.integration
 
-import junit.framework.Assert._
 import kafka.utils.{ZKGroupTopicDirs, Logging}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
 import kafka.server._
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
 import kafka.serializer._
 import kafka.producer.{Producer, KeyedMessage}
 
+import org.junit.Test
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+
 class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+
   val topic = "test_topic"
   val group = "default_group"
   val testConsumer = "consumer"
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
   val NumMessages = 10
   val LargeOffset = 10000
   val SmallOffset = -1
@@ -51,69 +54,39 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
     super.tearDown
   }
 
-  // fake test so that this test can pass
-  def testResetToEarliestWhenOffsetTooHigh() =
-    assertTrue(true)
-
-  /*  Temporarily disable those tests due to failures.
-kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh FAILED
-    java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
-        at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
-        at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
-        at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooHigh(AutoOffsetResetTest.scala:55)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow FAILED
-    java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
-        at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
-        at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
-        at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooLow(AutoOffsetResetTest.scala:58)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh FAILED
-    java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
-        at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
-        at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
-        at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooHigh(AutoOffsetResetTest.scala:61)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED
-    java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
-        at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
-        at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
-        at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooLow(AutoOffsetResetTest.scala:64)
-
+  @Test
   def testResetToEarliestWhenOffsetTooHigh() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset))
-  
+
+  @Test
   def testResetToEarliestWhenOffsetTooLow() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
 
+  @Test
   def testResetToLatestWhenOffsetTooHigh() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
 
+  @Test
   def testResetToLatestWhenOffsetTooLow() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
-  */
 
   /* Produce the given number of messages, create a consumer with the given offset policy, 
    * then reset the offset to the given value and consume until we get no new messages. 
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.createTopic(zkClient, topic, 1, 1, servers)
 
-    val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), 
-        new DefaultEncoder(), new StringEncoder())
+    val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
+      TestUtils.getBrokerListStrFromConfigs(configs),
+      keyEncoder = classOf[StringEncoder].getName)
 
     for(i <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
-
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
-    var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+    val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
     consumerProps.put("auto.offset.reset", resetTo)
     consumerProps.put("consumer.timeout.ms", "2000")
     consumerProps.put("fetch.wait.max.ms", "0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 4075068..25845ab 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -82,9 +82,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
-      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
-                                                                             new DefaultEncoder(),
-                                                                             new StringEncoder())
+      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
+        TestUtils.getBrokerListStrFromConfigs(configs),
+        keyEncoder = classOf[StringEncoder].getName)
       val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
       messages += conf.brokerId -> ms
       producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 731ee59..108c2e7 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -19,8 +19,10 @@ package kafka.integration
 
 import kafka.consumer.SimpleConsumer
 import org.scalatest.junit.JUnit3Suite
-import kafka.producer.{ProducerConfig, Producer}
-import kafka.utils.TestUtils
+import kafka.producer.Producer
+import kafka.utils.{StaticPartitioner, TestUtils}
+import kafka.serializer.StringEncoder
+
 trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
     val port: Int
     val host = "localhost"
@@ -29,8 +31,10 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
 
   override def setUp() {
       super.setUp
-      val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
-      producer = new Producer(new ProducerConfig(props))
+      producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[StringEncoder].getName,
+        partitioner = classOf[StaticPartitioner].getName)
       consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index d1d969e..f44568c 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -251,10 +251,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   private def produceMessage(topic: String, message: String) = {
-    val props = new Properties()
-    props.put("request.required.acks", String.valueOf(-1))
-    val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs),
-      new DefaultEncoder(), new StringEncoder(), props)
+    val producer: Producer[String, Array[Byte]] = createProducer(
+      getBrokerListStrFromConfigs(configs),
+      keyEncoder = classOf[StringEncoder].getName)
     producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
     producer.close()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 16e7164..20e8efe 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,22 +17,24 @@
 
 package kafka.javaapi.consumer
 
-import junit.framework.Assert._
-import kafka.integration.KafkaServerTestHarness
 import kafka.server._
-import org.scalatest.junit.JUnit3Suite
-import scala.collection.JavaConversions
-import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer._
+import kafka.integration.KafkaServerTestHarness
 import kafka.producer.KeyedMessage
 import kafka.javaapi.producer.Producer
 import kafka.utils.IntEncoder
-import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.zk.ZooKeeperTestHarness
 
+import scala.collection.JavaConversions
+
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.{Level, Logger}
+import junit.framework.Assert._
+
+
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
 
   val zookeeperConnect = zkConnect
@@ -52,14 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   def testBasic() {
     val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
-    var actualMessages: List[Message] = Nil
+
+    // create the topic
+    TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
-
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -79,7 +80,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                    compressed: CompressionCodec): List[String] = {
     var messages: List[String] = Nil
     val producer: kafka.producer.Producer[Int, String] = 
-      TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder())
+      TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName)
     val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 1b87acf..b349fce 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -16,15 +16,18 @@
 */
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Assert._
-import java.io.File
 import kafka.utils.TestUtils._
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common._
-import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
+import kafka.producer.{KeyedMessage, Producer}
+import kafka.serializer.StringEncoder
+
+import java.io.File
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -50,29 +53,33 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   
-  val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
-  producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
-  producerProps.put("request.required.acks", "-1")
-  
-  override def tearDown() {
-    for(server <- servers) {
-      server.shutdown()
-      Utils.rm(server.config.logDirs(0))
-    }
-    super.tearDown()
-  }
+  override def setUp() {
+    super.setUp()
 
-  def testHWCheckpointNoFailuresSingleLogSegment {
     // start both servers
     server1 = TestUtils.createServer(configProps1)
     server2 = TestUtils.createServer(configProps2)
     servers ++= List(server1, server2)
 
-    producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
     // create topic with 1 partition, 2 replicas, one on each broker
     createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
 
+    // create the producer
+    producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[IntEncoder].getName)
+  }
+
+  override def tearDown() {
+    producer.close()
+    for(server <- servers) {
+      server.shutdown()
+      Utils.rm(server.config.logDirs(0))
+    }
+    super.tearDown()
+  }
+
+  def testHWCheckpointNoFailuresSingleLogSegment {
     val numMessages = 2L
     sendMessages(numMessages.toInt)
 
@@ -82,7 +89,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
       "Failed to update high watermark for follower after timeout")
 
     servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
-    producer.close()
     val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(numMessages, leaderHW)
     val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -90,15 +96,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   def testHWCheckpointWithFailuresSingleLogSegment {
-    // start both servers
-    server1 = TestUtils.createServer(configProps1)
-    server2 = TestUtils.createServer(configProps2)
-    servers ++= List(server1, server2)
-
-    producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)(0)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
 
     assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
@@ -140,34 +138,19 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
-    producer.close()
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
   }
 
   def testHWCheckpointNoFailuresMultipleLogSegments {
-    // start both servers
-    server1 = TestUtils.createServer(configs.head)
-    server2 = TestUtils.createServer(configs.last)
-    servers ++= List(server1, server2)
-
-    hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-    hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-
-    producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
     sendMessages(20)
-    var hw = 20L
+    val hw = 20L
     // give some time for follower 1 to record leader HW of 600
     TestUtils.waitUntilTrue(() =>
       server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
-    producer.close()
     val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(hw, leaderHW)
     val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -175,19 +158,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   def testHWCheckpointWithFailuresMultipleLogSegments {
-    // start both servers
-    server1 = TestUtils.createServer(configs.head)
-    server2 = TestUtils.createServer(configs.last)
-    servers ++= List(server1, server2)
-
-    hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-    hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-
-    producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(server1.config.brokerId, server2.config.brokerId)),
-                servers = servers)(0)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
 
     sendMessages(2)
     var hw = 2L
@@ -224,7 +195,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
-    producer.close()
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index faf466b..3e0bc18 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -54,9 +54,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
     }
 
     // send test messages to leader
-    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), 
-                                                            new StringEncoder(), 
-                                                            new StringEncoder())
+    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+                                                            encoder = classOf[StringEncoder].getName,
+                                                            keyEncoder = classOf[StringEncoder].getName)
     val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
     producer.send(messages:_*)
     producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index addd11a..014e964 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,18 +16,20 @@
  */
 package kafka.server
 
-import java.io.File
-import kafka.consumer.SimpleConsumer
-import org.junit.Test
-import junit.framework.Assert._
-import kafka.message.ByteBufferMessageSet
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
 import kafka.producer._
-import kafka.utils.IntEncoder
+import kafka.utils.{IntEncoder, TestUtils, Utils}
 import kafka.utils.TestUtils._
 import kafka.api.FetchRequestBuilder
-import kafka.utils.{TestUtils, Utils}
+import kafka.message.ByteBufferMessageSet
+import kafka.serializer.StringEncoder
+
+import java.io.File
+
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
@@ -43,9 +45,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCleanShutdown() {
     var server = new KafkaServer(config)
     server.startup()
-    val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)))
-    producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
-    var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
+    var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[IntEncoder].getName)
 
     // create topic
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
@@ -69,7 +71,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     // wait for the broker to receive the update metadata request after startup
     TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
 
-    producer = new Producer[Int, String](new ProducerConfig(producerConfig))
+    producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[IntEncoder].getName)
     val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
 
     var fetchedMessage: ByteBufferMessageSet = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 498941d..f5a7a5b 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -334,30 +334,40 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a producer for the given host and port
+   * Create a producer with a few pre-configured properties.
+   * If certain properties need to be overridden, they can be provided in producerProps.
    */
   def createProducer[K, V](brokerList: String, 
-                           encoder: Encoder[V] = new DefaultEncoder(), 
-                           keyEncoder: Encoder[K] = new DefaultEncoder(),
-                           props: Properties = new Properties()): Producer[K, V] = {
-    props.put("metadata.broker.list", brokerList)
-    props.put("send.buffer.bytes", "65536")
-    props.put("connect.timeout.ms", "100000")
-    props.put("reconnect.interval", "10000")
-    props.put("serializer.class", encoder.getClass.getCanonicalName)
-    props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName)
+                           encoder: String = classOf[DefaultEncoder].getName,
+                           keyEncoder: String = classOf[DefaultEncoder].getName,
+                           partitioner: String = classOf[DefaultPartitioner].getName,
+                           producerProps: Properties = null): Producer[K, V] = {
+    val props: Properties =
+    if (producerProps == null) {
+      getProducerConfig(brokerList)
+    } else {
+      producerProps.put("metadata.broker.list", brokerList)
+      producerProps
+    }
+    props.put("serializer.class", encoder)
+    props.put("key.serializer.class", keyEncoder)
+    props.put("partitioner.class", partitioner)
     new Producer[K, V](new ProducerConfig(props))
   }
 
-  def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = {
+  /**
+   * Create a default producer config properties map with the given metadata broker list
+   */
+  def getProducerConfig(brokerList: String): Properties = {
     val props = new Properties()
     props.put("metadata.broker.list", brokerList)
-    props.put("partitioner.class", partitioner)
     props.put("message.send.max.retries", "3")
     props.put("retry.backoff.ms", "1000")
     props.put("request.timeout.ms", "500")
     props.put("request.required.acks", "-1")
-    props.put("serializer.class", classOf[StringEncoder].getName.toString)
+    props.put("send.buffer.bytes", "65536")
+    props.put("connect.timeout.ms", "100000")
+    props.put("reconnect.interval", "10000")
 
     props
   }
@@ -368,7 +378,7 @@ object TestUtils extends Logging {
     props.put("port", port.toString)
     props.put("request.timeout.ms", "500")
     props.put("request.required.acks", "1")
-    props.put("serializer.class", classOf[StringEncoder].getName.toString)
+    props.put("serializer.class", classOf[StringEncoder].getName)
     props
   }