You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 15:13:33 UTC

kafka git commit: MINOR: Use new consumer in ProducerCompressionTest

Repository: kafka
Updated Branches:
  refs/heads/trunk aebba89a2 -> 9323a7533


MINOR: Use new consumer in ProducerCompressionTest

This should be less flaky as it has a higher timeout. I also increased the timeout
in a couple of other tests that had a very low (100 ms) timeouts.

The failure would manifest itself as:

```text
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
	at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:100)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:84)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:133)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:132)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:131)
	at kafka.api.test.ProducerCompressionTest.testCompression(ProducerCompressionTest.scala:97)
```

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #3178 from ijuma/producer-compression-test-flaky


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9323a753
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9323a753
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9323a753

Branch: refs/heads/trunk
Commit: 9323a753355abcb62fe8121303e5e65dbfe018ea
Parents: aebba89
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed May 31 16:13:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 16:13:30 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/BaseProducerSendTest.scala        |  6 +-
 .../kafka/api/ProducerBounceTest.scala          |  2 +-
 .../kafka/api/ProducerCompressionTest.scala     | 88 +++++++++-----------
 .../unit/kafka/producer/ProducerTest.scala      |  2 +-
 4 files changed, 44 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 42e3b11..645f6ac 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -187,13 +187,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     try {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
-      val recordAndFutures = for (i <- 1 to numRecords) yield {
+      val futures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
           s"value$i".getBytes(StandardCharsets.UTF_8))
-        (record, producer.send(record))
+        producer.send(record)
       }
       producer.close(timeoutMs, TimeUnit.MILLISECONDS)
-      val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) =>
+      val lastOffset = futures.foldLeft(0) { (offset, future) =>
         val recordMetadata = future.get
         assertEquals(topic, recordMetadata.topic)
         assertEquals(partition, recordMetadata.partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5fead18..9fe0e5c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -101,7 +101,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
     val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) =>
       // Consumers must be instantiated after all the restarts since they use random ports each time they start up
-      val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 100, 1024 * 1024, "")
+      val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 30000, 1024 * 1024, "")
       val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
       consumer.close
       response

http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 2001095..23b78b0 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,7 +17,8 @@
 
 package kafka.api.test
 
-import java.util.{ArrayList, Collection, Properties}
+import java.util.{Collection, Collections, Properties}
+import scala.collection.JavaConverters._
 
 import org.junit.runners.Parameterized
 import org.junit.runner.RunWith
@@ -25,30 +26,27 @@ import org.junit.runners.Parameterized.Parameters
 import org.junit.{After, Before, Test}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.junit.Assert._
-import kafka.api.FetchRequestBuilder
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.consumer.SimpleConsumer
-import kafka.message.Message
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
-
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.serialization.ByteArraySerializer
 
 @RunWith(value = classOf[Parameterized])
 class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
-  private val brokerId = 0
-  private var server: KafkaServer = null
 
+  private val brokerId = 0
   private val topic = "topic"
   private val numRecords = 2000
 
+  private var server: KafkaServer = null
+
   @Before
   override def setUp() {
     super.setUp()
-
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
-    val config = KafkaConfig.fromProps(props)
-
-    server = TestUtils.createServer(config)
+    server = TestUtils.createServer(KafkaConfig.fromProps(props))
   }
 
   @After
@@ -65,15 +63,14 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
   @Test
   def testCompression() {
 
-    val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(Seq(server)))
-    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
-    props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
-    props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
-    val consumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 100, 1024*1024, "")
+    val producerProps = new Properties()
+    val bootstrapServers = TestUtils.getBrokerListStrFromServers(Seq(server))
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+    producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+    producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200")
+    val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)
+    val consumer = TestUtils.createNewConsumer(bootstrapServers, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     try {
       // create topic
@@ -81,50 +78,43 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
       val partition = 0
 
       // prepare the messages
-      val messages = for (i <-0 until numRecords)
-        yield ("value" + i).getBytes
+      val messageValues = (0 until numRecords).map(i => "value" + i)
 
       // make sure the returned messages are correct
       val now = System.currentTimeMillis()
-      val responses = for (message <- messages)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, now, null, message))
-      val futures = responses.toList
-      for ((future, offset) <- futures zip (0 until numRecords)) {
+      val responses = for (message <- messageValues)
+        yield producer.send(new ProducerRecord(topic, null, now, null, message.getBytes))
+      for ((future, offset) <- responses.zipWithIndex) {
         assertEquals(offset.toLong, future.get.offset)
       }
 
+      val tp = new TopicPartition(topic, partition)
       // make sure the fetched message count match
-      val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
-      val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer
-      assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size)
-
-      var index = 0
-      for (message <- messages) {
-        assertEquals(new Message(bytes = message, now, Message.MagicValue_V1), messageSet(index).message)
-        assertEquals(index.toLong, messageSet(index).offset)
-        index += 1
+      consumer.assign(Collections.singleton(tp))
+      consumer.seek(tp, 0)
+      val records = TestUtils.consumeRecords(consumer, numRecords)
+
+      for (((messageValue, record), index) <- messageValues.zip(records).zipWithIndex) {
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(now, record.timestamp)
+        assertEquals(index.toLong, record.offset)
       }
     } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-      if (consumer != null)
-        consumer.close()
+      producer.close()
+      consumer.close()
     }
   }
 }
 
 object ProducerCompressionTest {
 
-  // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
-  @Parameters
+  @Parameters(name = "{index} compressionType = {0}")
   def parameters: Collection[Array[String]] = {
-    val list = new ArrayList[Array[String]]()
-    list.add(Array("none"))
-    list.add(Array("gzip"))
-    list.add(Array("snappy"))
-    list.add(Array("lz4"))
-    list
+    Seq(
+      Array("none"),
+      Array("gzip"),
+      Array("snappy"),
+      Array("lz4")
+    ).asJava
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 1d3f77f..b2b9806 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -59,7 +59,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
   def getConsumer2() = {
     if (consumer2 == null)
-      consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 100, 64*1024, "")
+      consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 1000000, 64*1024, "")
     consumer2
   }