You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 15:13:33 UTC
kafka git commit: MINOR: Use new consumer in ProducerCompressionTest
Repository: kafka
Updated Branches:
refs/heads/trunk aebba89a2 -> 9323a7533
MINOR: Use new consumer in ProducerCompressionTest
This should be less flaky as it has a higher timeout. I also increased the timeout
in a couple of other tests that had a very low (100 ms) timeouts.
The failure would manifest itself as:
```text
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:100)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:84)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:133)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:131)
at kafka.api.test.ProducerCompressionTest.testCompression(ProducerCompressionTest.scala:97)
```
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3178 from ijuma/producer-compression-test-flaky
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9323a753
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9323a753
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9323a753
Branch: refs/heads/trunk
Commit: 9323a753355abcb62fe8121303e5e65dbfe018ea
Parents: aebba89
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed May 31 16:13:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 16:13:30 2017 +0100
----------------------------------------------------------------------
.../kafka/api/BaseProducerSendTest.scala | 6 +-
.../kafka/api/ProducerBounceTest.scala | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 88 +++++++++-----------
.../unit/kafka/producer/ProducerTest.scala | 2 +-
4 files changed, 44 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 42e3b11..645f6ac 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -187,13 +187,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
- val recordAndFutures = for (i <- 1 to numRecords) yield {
+ val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
s"value$i".getBytes(StandardCharsets.UTF_8))
- (record, producer.send(record))
+ producer.send(record)
}
producer.close(timeoutMs, TimeUnit.MILLISECONDS)
- val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) =>
+ val lastOffset = futures.foldLeft(0) { (offset, future) =>
val recordMetadata = future.get
assertEquals(topic, recordMetadata.topic)
assertEquals(partition, recordMetadata.partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5fead18..9fe0e5c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -101,7 +101,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) =>
// Consumers must be instantiated after all the restarts since they use random ports each time they start up
- val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 100, 1024 * 1024, "")
+ val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 30000, 1024 * 1024, "")
val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
consumer.close
response
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 2001095..23b78b0 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,7 +17,8 @@
package kafka.api.test
-import java.util.{ArrayList, Collection, Properties}
+import java.util.{Collection, Collections, Properties}
+import scala.collection.JavaConverters._
import org.junit.runners.Parameterized
import org.junit.runner.RunWith
@@ -25,30 +26,27 @@ import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.junit.Assert._
-import kafka.api.FetchRequestBuilder
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.consumer.SimpleConsumer
-import kafka.message.Message
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils
-
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.serialization.ByteArraySerializer
@RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
- private val brokerId = 0
- private var server: KafkaServer = null
+ private val brokerId = 0
private val topic = "topic"
private val numRecords = 2000
+ private var server: KafkaServer = null
+
@Before
override def setUp() {
super.setUp()
-
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- val config = KafkaConfig.fromProps(props)
-
- server = TestUtils.createServer(config)
+ server = TestUtils.createServer(KafkaConfig.fromProps(props))
}
@After
@@ -65,15 +63,14 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
@Test
def testCompression() {
- val props = new Properties()
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(Seq(server)))
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
- props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
- var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
- val consumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 100, 1024*1024, "")
+ val producerProps = new Properties()
+ val bootstrapServers = TestUtils.getBrokerListStrFromServers(Seq(server))
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+ producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+ producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200")
+ val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)
+ val consumer = TestUtils.createNewConsumer(bootstrapServers, securityProtocol = SecurityProtocol.PLAINTEXT)
try {
// create topic
@@ -81,50 +78,43 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
val partition = 0
// prepare the messages
- val messages = for (i <-0 until numRecords)
- yield ("value" + i).getBytes
+ val messageValues = (0 until numRecords).map(i => "value" + i)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
- val responses = for (message <- messages)
- yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, now, null, message))
- val futures = responses.toList
- for ((future, offset) <- futures zip (0 until numRecords)) {
+ val responses = for (message <- messageValues)
+ yield producer.send(new ProducerRecord(topic, null, now, null, message.getBytes))
+ for ((future, offset) <- responses.zipWithIndex) {
assertEquals(offset.toLong, future.get.offset)
}
+ val tp = new TopicPartition(topic, partition)
// make sure the fetched message count match
- val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
- val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer
- assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size)
-
- var index = 0
- for (message <- messages) {
- assertEquals(new Message(bytes = message, now, Message.MagicValue_V1), messageSet(index).message)
- assertEquals(index.toLong, messageSet(index).offset)
- index += 1
+ consumer.assign(Collections.singleton(tp))
+ consumer.seek(tp, 0)
+ val records = TestUtils.consumeRecords(consumer, numRecords)
+
+ for (((messageValue, record), index) <- messageValues.zip(records).zipWithIndex) {
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(now, record.timestamp)
+ assertEquals(index.toLong, record.offset)
}
} finally {
- if (producer != null) {
- producer.close()
- producer = null
- }
- if (consumer != null)
- consumer.close()
+ producer.close()
+ consumer.close()
}
}
}
object ProducerCompressionTest {
- // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
- @Parameters
+ @Parameters(name = "{index} compressionType = {0}")
def parameters: Collection[Array[String]] = {
- val list = new ArrayList[Array[String]]()
- list.add(Array("none"))
- list.add(Array("gzip"))
- list.add(Array("snappy"))
- list.add(Array("lz4"))
- list
+ Seq(
+ Array("none"),
+ Array("gzip"),
+ Array("snappy"),
+ Array("lz4")
+ ).asJava
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 1d3f77f..b2b9806 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -59,7 +59,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
def getConsumer2() = {
if (consumer2 == null)
- consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 100, 64*1024, "")
+ consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 1000000, 64*1024, "")
consumer2
}