You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/22 22:56:07 UTC
git commit: kafka-1411; remove unused test cases; patched by Jun Rao;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 3af3efe37 -> ed68ba402
kafka-1411; remove unused test cases; patched by Jun Rao; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed68ba40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed68ba40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed68ba40
Branch: refs/heads/trunk
Commit: ed68ba402e088070da0513a3675ed420ec6a70b0
Parents: 3af3efe
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Apr 22 13:56:00 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 22 13:56:00 2014 -0700
----------------------------------------------------------------------
.../integration/LazyInitProducerTest.scala | 173 -------------------
.../kafka/integration/PrimitiveApiTest.scala | 18 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 36 ----
3 files changed, 1 insertion(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed68ba40/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
deleted file mode 100644
index c3c7631..0000000
--- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import kafka.api.FetchRequestBuilder
-import kafka.message.ByteBufferMessageSet
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
-import scala.collection._
-import kafka.utils._
-import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
-import kafka.producer.KeyedMessage
-import org.junit.Assert.assertEquals
-
-/**
- * End to end tests of the primitive apis against a local server
- */
-class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
-
- val port = TestUtils.choosePort
- val props = TestUtils.createBrokerConfig(0, port)
- val config = new KafkaConfig(props)
- val configs = List(config)
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-
- override def setUp() {
- super.setUp
- if(configs.size <= 0)
- throw new KafkaException("Must suply at least one server config.")
-
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
- }
-
- override def tearDown() {
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
-
- super.tearDown
- }
-
- def testProduceAndFetch() {
- // send some messages
- val topic = "test"
- val sentMessages = List("hello", "there")
- val producerData = sentMessages.map(m => new KeyedMessage[String, String](topic, topic, m))
-
- producer.send(producerData:_*)
-
- TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-
- var fetchedMessage: ByteBufferMessageSet = null
- while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
- val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- fetchedMessage = fetched.messageSet(topic, 0)
- }
- assertEquals(sentMessages, fetchedMessage.map(m => Utils.readString(m.message.payload)).toList)
-
- // send an invalid offset
- try {
- val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
- fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
- fail("Expected an OffsetOutOfRangeException exception to be thrown")
- } catch {
- case e: OffsetOutOfRangeException =>
- }
- }
-
- def testProduceAndMultiFetch() {
- // send some messages, with non-ordered topics
- val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
- {
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- for( (topic, offset) <- topicOffsets) {
- val producedData = List("a_" + topic, "b_" + topic)
- messages += topic -> producedData
- producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
- TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
- builder.addFetch(topic, offset, 0, 10000)
- }
-
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for( (topic, offset) <- topicOffsets) {
- val fetched = response.messageSet(topic, offset)
- assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
- }
- }
-
- // send some invalid offsets
- val builder = new FetchRequestBuilder()
- for((topic, offset) <- topicOffsets)
- builder.addFetch(topic, offset, -1, 10000)
-
- val request = builder.build()
- val responses = consumer.fetch(request)
- responses.data.values.foreach(pd => {
- try {
- ErrorMapping.maybeThrowException(pd.error)
- fail("Expected an OffsetOutOfRangeException exception to be thrown")
- } catch {
- case e: OffsetOutOfRangeException => // this is good
- }
- })
- }
-
- def testMultiProduce() {
- // send some messages
- val topics = List("test1", "test2", "test3");
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
- for(topic <- topics) {
- val set = List("a_" + topic, "b_" + topic)
- messages += topic -> set
- produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
- builder.addFetch(topic, 0, 0, 10000)
- }
- producer.send(produceList: _*)
- topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000))
-
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for(topic <- topics) {
- val fetched = response.messageSet(topic, 0)
- assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
- }
- }
-
- def testMultiProduceResend() {
- // send some messages
- val topics = List("test1", "test2", "test3");
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
- for(topic <- topics) {
- val set = List("a_" + topic, "b_" + topic)
- messages += topic -> set
- produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
- builder.addFetch(topic, 0, 0, 10000)
- }
- producer.send(produceList: _*)
- topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000))
-
- producer.send(produceList: _*)
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for(topic <- topics) {
- val topicMessages = response.messageSet(topic, 0)
- assertEquals(messages(topic) ++ messages(topic), topicMessages.map(m => Utils.readString(m.message.payload)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed68ba40/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index a062f68..60a466f 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -94,7 +94,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testDefaultEncoderProducerAndFetchWithCompression() {
val topic = "test-topic"
val props = producer.config.props.props
- props.put("compression", "true")
+ props.put("compression.codec", "gzip")
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
@@ -178,14 +178,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
produceAndMultiFetch(noCompressionProducer)
}
- def testProduceAndMultiFetchWithCompression() {
- val props = producer.config.props.props
- props.put("compression", "true")
- val config = new ProducerConfig(props)
- val producerWithCompression = new Producer[String, String](config)
- produceAndMultiFetch(producerWithCompression)
- }
-
private def multiProduce(producer: Producer[String, String]) {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
@@ -215,14 +207,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
multiProduce(noCompressionProducer)
}
- def testMultiProduceWithCompression() {
- val props = producer.config.props.props
- props.put("compression", "true")
- val config = new ProducerConfig(props)
- val producerWithCompression = new Producer[String, String](config)
- multiProduce(producerWithCompression)
- }
-
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
AdminUtils.createTopic(zkClient, newTopic, 1, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ed68ba40/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 18e3555..bdc6f01 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -356,42 +356,6 @@ class AsyncProducerTest extends JUnit3Suite {
}
@Test
- def testBrokerListAndAsync() {
- return
- val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
- props.put("producer.type", "async")
- props.put("batch.num.messages", "5")
-
- val config = new ProducerConfig(props)
-
- val topic = "topic1"
- val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092)
- val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
- topicPartitionInfos.put("topic1", topic1Metadata)
-
- val producerPool = new ProducerPool(config)
-
- val msgs = TestUtils.getMsgStrings(10)
-
- val handler = new DefaultEventHandler[String,String](config,
- partitioner = null.asInstanceOf[Partitioner],
- encoder = new StringEncoder,
- keyEncoder = new StringEncoder,
- producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos)
-
- val producer = new Producer[String, String](config, handler)
- try {
- // send all 10 messages, should create 2 batches and 2 syncproducer calls
- producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
- producer.close
-
- } catch {
- case e: Exception => fail("Not expected", e)
- }
- }
-
- @Test
def testFailedSendRetryLogic() {
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))