You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/21 05:36:45 UTC
[kafka] branch trunk updated: MINOR: Fix flaky TestUtils functions
(#4743)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f5287cc MINOR: Fix flaky TestUtils functions (#4743)
f5287cc is described below
commit f5287ccad2de7de586788ed4ee08a065a007a42e
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Tue Mar 20 22:36:41 2018 -0700
MINOR: Fix flaky TestUtils functions (#4743)
TestUtils#produceMessages should always close the KafkaProducer, even
when there is an exception. Otherwise, the test will leak threads when
there is an error.
TestUtils#createNewProducer should create a producer with a
requestTimeoutMs of 30 seconds by default, not around 10 seconds.
This should avoid tests that flake when the load on Jenkins climbs.
Fix two cases where a very short timeout of 2 seconds was getting set.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../test/scala/unit/kafka/utils/TestUtils.scala | 29 +++++++++++-----------
1 file changed, 15 insertions(+), 14 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 636c0bd..4b87406 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -555,7 +555,7 @@ object TestUtils extends Logging {
bufferSize: Long = 1024L * 1024L,
retries: Int = 0,
lingerMs: Long = 0,
- requestTimeoutMs: Long = 10 * 1024L,
+ requestTimeoutMs: Long = 30 * 1000L,
securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
trustStoreFile: Option[File] = None,
saslProperties: Option[Properties] = None,
@@ -1086,20 +1086,22 @@ object TestUtils extends Logging {
val producer = createNewProducer(
TestUtils.getBrokerListStrFromServers(servers),
retries = 5,
- requestTimeoutMs = 2000,
acks = acks
)
-
- val values = (0 until numMessages).map(x => valueBytes match {
- case -1 => s"test-$x".getBytes
- case _ => new Array[Byte](valueBytes)
- })
-
- val futures = values.map { value =>
- producer.send(new ProducerRecord(topic, value))
+ val values = try {
+ val curValues = (0 until numMessages).map(x => valueBytes match {
+ case -1 => s"test-$x".getBytes
+ case _ => new Array[Byte](valueBytes)
+ })
+
+ val futures = curValues.map { value =>
+ producer.send(new ProducerRecord(topic, value))
+ }
+ futures.foreach(_.get)
+ curValues
+ } finally {
+ producer.close()
}
- futures.foreach(_.get)
- producer.close()
debug(s"Sent ${values.size} messages for topic [$topic]")
@@ -1109,8 +1111,7 @@ object TestUtils extends Logging {
def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
val producer = createNewProducer(
TestUtils.getBrokerListStrFromServers(servers),
- retries = 5,
- requestTimeoutMs = 2000
+ retries = 5
)
producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
producer.close()
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.