You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/22 22:56:07 UTC

git commit: kafka-1411; remove unused test cases; patched by Jun Rao; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 3af3efe37 -> ed68ba402


kafka-1411; remove unused test cases; patched by Jun Rao; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed68ba40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed68ba40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed68ba40

Branch: refs/heads/trunk
Commit: ed68ba402e088070da0513a3675ed420ec6a70b0
Parents: 3af3efe
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Apr 22 13:56:00 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 22 13:56:00 2014 -0700

----------------------------------------------------------------------
 .../integration/LazyInitProducerTest.scala      | 173 -------------------
 .../kafka/integration/PrimitiveApiTest.scala    |  18 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |  36 ----
 3 files changed, 1 insertion(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed68ba40/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
deleted file mode 100644
index c3c7631..0000000
--- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import kafka.api.FetchRequestBuilder
-import kafka.message.ByteBufferMessageSet
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
-import scala.collection._
-import kafka.utils._
-import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
-import kafka.producer.KeyedMessage
-import org.junit.Assert.assertEquals
-
-/**
- * End to end tests of the primitive apis against a local server
- */
-class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
-
-  val port = TestUtils.choosePort
-  val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props)
-  val configs = List(config)
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-
-  override def setUp() {
-    super.setUp
-    if(configs.size <= 0)
-      throw new KafkaException("Must suply at least one server config.")
-
-    // temporarily set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.FATAL)    
-  }
-
-  override def tearDown() {
-    // restore set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.ERROR)
-
-    super.tearDown    
-  }
-  
-  def testProduceAndFetch() {
-    // send some messages
-    val topic = "test"
-    val sentMessages = List("hello", "there")
-    val producerData = sentMessages.map(m => new KeyedMessage[String, String](topic, topic, m))
-
-    producer.send(producerData:_*)
-
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-
-    var fetchedMessage: ByteBufferMessageSet = null
-    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
-      val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-      fetchedMessage = fetched.messageSet(topic, 0)
-    }
-    assertEquals(sentMessages, fetchedMessage.map(m => Utils.readString(m.message.payload)).toList)
-
-    // send an invalid offset
-    try {
-      val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
-      fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
-      fail("Expected an OffsetOutOfRangeException exception to be thrown")
-    } catch {
-      case e: OffsetOutOfRangeException => 
-    }
-  }
-
-  def testProduceAndMultiFetch() {
-    // send some messages, with non-ordered topics
-    val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    {
-      val messages = new mutable.HashMap[String, Seq[String]]
-      val builder = new FetchRequestBuilder()
-      for( (topic, offset) <- topicOffsets) {
-        val producedData = List("a_" + topic, "b_" + topic)
-        messages += topic -> producedData
-        producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
-        TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-        builder.addFetch(topic, offset, 0, 10000)
-      }
-
-      // wait a bit for produced message to be available
-      val request = builder.build()
-      val response = consumer.fetch(request)
-      for( (topic, offset) <- topicOffsets) {
-        val fetched = response.messageSet(topic, offset)
-        assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
-      }
-    }
-
-    // send some invalid offsets
-    val builder = new FetchRequestBuilder()
-    for((topic, offset) <- topicOffsets)
-      builder.addFetch(topic, offset, -1, 10000)
-
-    val request = builder.build()
-    val responses = consumer.fetch(request)
-    responses.data.values.foreach(pd => {
-      try {
-        ErrorMapping.maybeThrowException(pd.error)
-        fail("Expected an OffsetOutOfRangeException exception to be thrown")
-      } catch {
-        case e: OffsetOutOfRangeException => // this is good
-      }
-    })
-  }
-
-  def testMultiProduce() {
-    // send some messages
-    val topics = List("test1", "test2", "test3");
-    val messages = new mutable.HashMap[String, Seq[String]]
-    val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
-    for(topic <- topics) {
-      val set = List("a_" + topic, "b_" + topic)
-      messages += topic -> set
-      produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
-      builder.addFetch(topic, 0, 0, 10000)
-    }
-    producer.send(produceList: _*)
-    topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000))
-
-    // wait a bit for produced message to be available
-    val request = builder.build()
-    val response = consumer.fetch(request)
-    for(topic <- topics) {
-      val fetched = response.messageSet(topic, 0)
-      assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
-    }
-  }
-
-  def testMultiProduceResend() {
-    // send some messages
-    val topics = List("test1", "test2", "test3");
-    val messages = new mutable.HashMap[String, Seq[String]]
-    val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
-    for(topic <- topics) {
-      val set = List("a_" + topic, "b_" + topic)
-      messages += topic -> set
-      produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
-      builder.addFetch(topic, 0, 0, 10000)
-    }
-    producer.send(produceList: _*)
-    topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000))
-
-    producer.send(produceList: _*)
-    // wait a bit for produced message to be available
-    val request = builder.build()
-    val response = consumer.fetch(request)
-    for(topic <- topics) {
-      val topicMessages = response.messageSet(topic, 0)
-      assertEquals(messages(topic) ++ messages(topic), topicMessages.map(m => Utils.readString(m.message.payload)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed68ba40/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index a062f68..60a466f 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -94,7 +94,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
   def testDefaultEncoderProducerAndFetchWithCompression() {
     val topic = "test-topic"
     val props = producer.config.props.props
-    props.put("compression", "true")
+    props.put("compression.codec", "gzip")
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
@@ -178,14 +178,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     produceAndMultiFetch(noCompressionProducer)
   }
 
-  def testProduceAndMultiFetchWithCompression() {
-    val props = producer.config.props.props
-    props.put("compression", "true")
-    val config = new ProducerConfig(props)
-    val producerWithCompression = new Producer[String, String](config)
-    produceAndMultiFetch(producerWithCompression)
-  }
-
   private def multiProduce(producer: Producer[String, String]) {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
     createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
@@ -215,14 +207,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     multiProduce(noCompressionProducer)
   }
 
-  def testMultiProduceWithCompression() {
-    val props = producer.config.props.props
-    props.put("compression", "true")
-    val config = new ProducerConfig(props)
-    val producerWithCompression = new Producer[String, String](config)
-    multiProduce(producerWithCompression)
-  }
-
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
     AdminUtils.createTopic(zkClient, newTopic, 1, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed68ba40/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 18e3555..bdc6f01 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -356,42 +356,6 @@ class AsyncProducerTest extends JUnit3Suite {
   }
 
   @Test
-  def testBrokerListAndAsync() {
-    return
-    val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
-    props.put("producer.type", "async")
-    props.put("batch.num.messages", "5")
-
-    val config = new ProducerConfig(props)
-
-    val topic = "topic1"
-    val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092)
-    val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
-    topicPartitionInfos.put("topic1", topic1Metadata)
-
-    val producerPool = new ProducerPool(config)
-
-    val msgs = TestUtils.getMsgStrings(10)
-
-    val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner],
-                                                         encoder = new StringEncoder,
-                                                         keyEncoder = new StringEncoder,
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos)
-
-    val producer = new Producer[String, String](config, handler)
-    try {
-      // send all 10 messages, should create 2 batches and 2 syncproducer calls
-      producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
-      producer.close
-
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    }
-  }
-
-  @Test
   def testFailedSendRetryLogic() {
     val props = new Properties()
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))