You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/25 23:31:37 UTC
git commit: kafka-1395; fix unit tests in AutoOffsetResetTest;
patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk c9bb24f15 -> 8a0314d01
kafka-1395; fix unit tests in AutoOffsetResetTest; patched by Guozhang Wang; reviewed by Neha Narkhede, Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8a0314d0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8a0314d0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8a0314d0
Branch: refs/heads/trunk
Commit: 8a0314d01786541f06d0eb41b2e81c8bb1d45a8d
Parents: c9bb24f
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Apr 25 14:31:33 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Apr 25 14:31:33 2014 -0700
----------------------------------------------------------------------
.../kafka/integration/AutoOffsetResetTest.scala | 61 ++++----------
.../unit/kafka/integration/FetcherTest.scala | 6 +-
.../ProducerConsumerTestHarness.scala | 12 ++-
.../integration/UncleanLeaderElectionTest.scala | 7 +-
.../ZookeeperConsumerConnectorTest.scala | 25 +++---
.../unit/kafka/server/LogRecoveryTest.scala | 86 +++++++-------------
.../unit/kafka/server/ReplicaFetchTest.scala | 6 +-
.../unit/kafka/server/ServerShutdownTest.scala | 28 ++++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 38 +++++----
9 files changed, 116 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 7125ec9..95303e0 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -17,22 +17,25 @@
package kafka.integration
-import junit.framework.Assert._
import kafka.utils.{ZKGroupTopicDirs, Logging}
import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
import kafka.server._
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
import kafka.serializer._
import kafka.producer.{Producer, KeyedMessage}
+import org.junit.Test
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+ val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+
val topic = "test_topic"
val group = "default_group"
val testConsumer = "consumer"
- val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
val NumMessages = 10
val LargeOffset = 10000
val SmallOffset = -1
@@ -51,69 +54,39 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
super.tearDown
}
- // fake test so that this test can pass
- def testResetToEarliestWhenOffsetTooHigh() =
- assertTrue(true)
-
- /* Temporarily disable those tests due to failures.
-kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooHigh(AutoOffsetResetTest.scala:55)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooLow(AutoOffsetResetTest.scala:58)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooHigh(AutoOffsetResetTest.scala:61)
-
-
-kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED
- java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0]
- at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478)
- at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71)
- at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooLow(AutoOffsetResetTest.scala:64)
-
+ @Test
def testResetToEarliestWhenOffsetTooHigh() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset))
-
+
+ @Test
def testResetToEarliestWhenOffsetTooLow() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
+ @Test
def testResetToLatestWhenOffsetTooHigh() =
assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
+ @Test
def testResetToLatestWhenOffsetTooLow() =
assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
- */
/* Produce the given number of messages, create a consumer with the given offset policy,
* then reset the offset to the given value and consume until we get no new messages.
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+ TestUtils.createTopic(zkClient, topic, 1, 1, servers)
- val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
- new DefaultEncoder(), new StringEncoder())
+ val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
+ TestUtils.getBrokerListStrFromConfigs(configs),
+ keyEncoder = classOf[StringEncoder].getName)
for(i <- 0 until numMessages)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
- TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
-
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
- var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+ val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("auto.offset.reset", resetTo)
consumerProps.put("consumer.timeout.ms", "2000")
consumerProps.put("fetch.wait.max.ms", "0")
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 4075068..25845ab 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -82,9 +82,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
- val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
- new DefaultEncoder(),
- new StringEncoder())
+ val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
+ TestUtils.getBrokerListStrFromConfigs(configs),
+ keyEncoder = classOf[StringEncoder].getName)
val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
messages += conf.brokerId -> ms
producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 731ee59..108c2e7 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -19,8 +19,10 @@ package kafka.integration
import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite
-import kafka.producer.{ProducerConfig, Producer}
-import kafka.utils.TestUtils
+import kafka.producer.Producer
+import kafka.utils.{StaticPartitioner, TestUtils}
+import kafka.serializer.StringEncoder
+
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
@@ -29,8 +31,10 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
override def setUp() {
super.setUp
- val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
- producer = new Producer(new ProducerConfig(props))
+ producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[StringEncoder].getName,
+ partitioner = classOf[StaticPartitioner].getName)
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index d1d969e..f44568c 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -251,10 +251,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
}
private def produceMessage(topic: String, message: String) = {
- val props = new Properties()
- props.put("request.required.acks", String.valueOf(-1))
- val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs),
- new DefaultEncoder(), new StringEncoder(), props)
+ val producer: Producer[String, Array[Byte]] = createProducer(
+ getBrokerListStrFromConfigs(configs),
+ keyEncoder = classOf[StringEncoder].getName)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
producer.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 16e7164..20e8efe 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,22 +17,24 @@
package kafka.javaapi.consumer
-import junit.framework.Assert._
-import kafka.integration.KafkaServerTestHarness
import kafka.server._
-import org.scalatest.junit.JUnit3Suite
-import scala.collection.JavaConversions
-import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer._
+import kafka.integration.KafkaServerTestHarness
import kafka.producer.KeyedMessage
import kafka.javaapi.producer.Producer
import kafka.utils.IntEncoder
-import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.zk.ZooKeeperTestHarness
+import scala.collection.JavaConversions
+
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.{Level, Logger}
+import junit.framework.Assert._
+
+
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
val zookeeperConnect = zkConnect
@@ -52,14 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
- var actualMessages: List[Message] = Nil
+
+ // create the topic
+ TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
-
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -79,7 +80,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
compressed: CompressionCodec): List[String] = {
var messages: List[String] = Nil
val producer: kafka.producer.Producer[Int, String] =
- TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder())
+ TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 1b87acf..b349fce 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -16,15 +16,18 @@
*/
package kafka.server
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Assert._
-import java.io.File
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.common._
-import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
+import kafka.producer.{KeyedMessage, Producer}
+import kafka.serializer.StringEncoder
+
+import java.io.File
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -50,29 +53,33 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
- producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
- producerProps.put("request.required.acks", "-1")
-
- override def tearDown() {
- for(server <- servers) {
- server.shutdown()
- Utils.rm(server.config.logDirs(0))
- }
- super.tearDown()
- }
+ override def setUp() {
+ super.setUp()
- def testHWCheckpointNoFailuresSingleLogSegment {
// start both servers
server1 = TestUtils.createServer(configProps1)
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
// create topic with 1 partition, 2 replicas, one on each broker
createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+ // create the producer
+ producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
+ }
+
+ override def tearDown() {
+ producer.close()
+ for(server <- servers) {
+ server.shutdown()
+ Utils.rm(server.config.logDirs(0))
+ }
+ super.tearDown()
+ }
+
+ def testHWCheckpointNoFailuresSingleLogSegment {
val numMessages = 2L
sendMessages(numMessages.toInt)
@@ -82,7 +89,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
"Failed to update high watermark for follower after timeout")
servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
- producer.close()
val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -90,15 +96,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testHWCheckpointWithFailuresSingleLogSegment {
- // start both servers
- server1 = TestUtils.createServer(configProps1)
- server2 = TestUtils.createServer(configProps2)
- servers ++= List(server1, server2)
-
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)(0)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
@@ -140,34 +138,19 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
- producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}
def testHWCheckpointNoFailuresMultipleLogSegments {
- // start both servers
- server1 = TestUtils.createServer(configs.head)
- server2 = TestUtils.createServer(configs.last)
- servers ++= List(server1, server2)
-
- hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
- hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
sendMessages(20)
- var hw = 20L
+ val hw = 20L
// give some time for follower 1 to record leader HW of 600
TestUtils.waitUntilTrue(() =>
server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
- producer.close()
val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -175,19 +158,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testHWCheckpointWithFailuresMultipleLogSegments {
- // start both servers
- server1 = TestUtils.createServer(configs.head)
- server2 = TestUtils.createServer(configs.last)
- servers ++= List(server1, server2)
-
- hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
- hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
-
- producer = new Producer[Int, String](new ProducerConfig(producerProps))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(server1.config.brokerId, server2.config.brokerId)),
- servers = servers)(0)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
sendMessages(2)
var hw = 2L
@@ -224,7 +195,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
- producer.close()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index faf466b..3e0bc18 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -54,9 +54,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
}
// send test messages to leader
- val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
- new StringEncoder(),
- new StringEncoder())
+ val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[StringEncoder].getName)
val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
producer.send(messages:_*)
producer.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index addd11a..014e964 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,18 +16,20 @@
*/
package kafka.server
-import java.io.File
-import kafka.consumer.SimpleConsumer
-import org.junit.Test
-import junit.framework.Assert._
-import kafka.message.ByteBufferMessageSet
-import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
import kafka.producer._
-import kafka.utils.IntEncoder
+import kafka.utils.{IntEncoder, TestUtils, Utils}
import kafka.utils.TestUtils._
import kafka.api.FetchRequestBuilder
-import kafka.utils.{TestUtils, Utils}
+import kafka.message.ByteBufferMessageSet
+import kafka.serializer.StringEncoder
+
+import java.io.File
+
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
@@ -43,9 +45,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCleanShutdown() {
var server = new KafkaServer(config)
server.startup()
- val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)))
- producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
- var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
+ var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
// create topic
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
@@ -69,7 +71,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
// wait for the broker to receive the update metadata request after startup
TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
- producer = new Producer[Int, String](new ProducerConfig(producerConfig))
+ producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName)
val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
var fetchedMessage: ByteBufferMessageSet = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a0314d0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 498941d..f5a7a5b 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -334,30 +334,40 @@ object TestUtils extends Logging {
}
/**
- * Create a producer for the given host and port
+ * Create a producer with a few pre-configured properties.
+ * If certain properties need to be overridden, they can be provided in producerProps.
*/
def createProducer[K, V](brokerList: String,
- encoder: Encoder[V] = new DefaultEncoder(),
- keyEncoder: Encoder[K] = new DefaultEncoder(),
- props: Properties = new Properties()): Producer[K, V] = {
- props.put("metadata.broker.list", brokerList)
- props.put("send.buffer.bytes", "65536")
- props.put("connect.timeout.ms", "100000")
- props.put("reconnect.interval", "10000")
- props.put("serializer.class", encoder.getClass.getCanonicalName)
- props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName)
+ encoder: String = classOf[DefaultEncoder].getName,
+ keyEncoder: String = classOf[DefaultEncoder].getName,
+ partitioner: String = classOf[DefaultPartitioner].getName,
+ producerProps: Properties = null): Producer[K, V] = {
+ val props: Properties =
+ if (producerProps == null) {
+ getProducerConfig(brokerList)
+ } else {
+ producerProps.put("metadata.broker.list", brokerList)
+ producerProps
+ }
+ props.put("serializer.class", encoder)
+ props.put("key.serializer.class", keyEncoder)
+ props.put("partitioner.class", partitioner)
new Producer[K, V](new ProducerConfig(props))
}
- def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = {
+ /**
+ * Create a default producer config properties map with the given metadata broker list
+ */
+ def getProducerConfig(brokerList: String): Properties = {
val props = new Properties()
props.put("metadata.broker.list", brokerList)
- props.put("partitioner.class", partitioner)
props.put("message.send.max.retries", "3")
props.put("retry.backoff.ms", "1000")
props.put("request.timeout.ms", "500")
props.put("request.required.acks", "-1")
- props.put("serializer.class", classOf[StringEncoder].getName.toString)
+ props.put("send.buffer.bytes", "65536")
+ props.put("connect.timeout.ms", "100000")
+ props.put("reconnect.interval", "10000")
props
}
@@ -368,7 +378,7 @@ object TestUtils extends Logging {
props.put("port", port.toString)
props.put("request.timeout.ms", "500")
props.put("request.required.acks", "1")
- props.put("serializer.class", classOf[StringEncoder].getName.toString)
+ props.put("serializer.class", classOf[StringEncoder].getName)
props
}