You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/21 05:37:54 UTC

[kafka] branch 1.0 updated: MINOR: Fix flaky TestUtils functions (#4743)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 1f24a31  MINOR: Fix flaky TestUtils functions (#4743)
1f24a31 is described below

commit 1f24a315b6e1ea25e45a44cea3155e5d0d65804e
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Tue Mar 20 22:36:41 2018 -0700

    MINOR: Fix flaky TestUtils functions (#4743)
    
    TestUtils#produceMessages should always close the KafkaProducer, even
    when there is an exception.  Otherwise, the test will leak threads when
    there is an error.
    
    TestUtils#createNewProducer should create a producer with a
    requestTimeoutMs of 30 seconds by default, not around 10 seconds.
    This should avoid tests that flake when the load on Jenkins climbs.
    
    Fix two cases where a very short timeout of 2 seconds was getting set.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f1bd7d2..b45a841 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -549,7 +549,7 @@ object TestUtils extends Logging {
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
                         lingerMs: Long = 0,
-                        requestTimeoutMs: Long = 10 * 1024L,
+                        requestTimeoutMs: Long = 30 * 1000L,
                         securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                         trustStoreFile: Option[File] = None,
                         saslProperties: Option[Properties] = None,
@@ -1092,20 +1092,22 @@ object TestUtils extends Logging {
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
       retries = 5,
-      requestTimeoutMs = 2000,
       acks = acks
     )
-
-    val values = (0 until numMessages).map(x => valueBytes match {
-      case -1 => s"test-$x".getBytes
-      case _ => new Array[Byte](valueBytes)
-    })
-
-    val futures = values.map { value =>
-      producer.send(new ProducerRecord(topic, value))
+    val values = try {
+      val curValues = (0 until numMessages).map(x => valueBytes match {
+        case -1 => s"test-$x".getBytes
+        case _ => new Array[Byte](valueBytes)
+      })
+
+      val futures = curValues.map { value =>
+        producer.send(new ProducerRecord(topic, value))
+      }
+      futures.foreach(_.get)
+      curValues 
+    } finally {
+      producer.close()
     }
-    futures.foreach(_.get)
-    producer.close()
 
     debug(s"Sent ${values.size} messages for topic [$topic]")
 
@@ -1115,8 +1117,7 @@ object TestUtils extends Logging {
   def produceMessage(servers: Seq[KafkaServer], topic: String, partition: Integer, message: String) {
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5,
-      requestTimeoutMs = 2000
+      retries = 5
     )
     producer.send(new ProducerRecord(topic, partition, topic.getBytes, message.getBytes)).get
     producer.close()

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.