You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/21 05:36:45 UTC

[kafka] branch trunk updated: MINOR: Fix flaky TestUtils functions (#4743)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f5287cc  MINOR: Fix flaky TestUtils functions (#4743)
f5287cc is described below

commit f5287ccad2de7de586788ed4ee08a065a007a42e
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Tue Mar 20 22:36:41 2018 -0700

    MINOR: Fix flaky TestUtils functions (#4743)
    
    TestUtils#produceMessages should always close the KafkaProducer, even
    when there is an exception.  Otherwise, the test will leak threads when
    there is an error.
    
    TestUtils#createNewProducer should create a producer with a
    requestTimeoutMs of 30 seconds by default, not around 10 seconds.
    This should avoid tests that flake when the load on Jenkins climbs.
    
    Fix two cases where a very short timeout of 2 seconds was getting set.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 636c0bd..4b87406 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -555,7 +555,7 @@ object TestUtils extends Logging {
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
                         lingerMs: Long = 0,
-                        requestTimeoutMs: Long = 10 * 1024L,
+                        requestTimeoutMs: Long = 30 * 1000L,
                         securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                         trustStoreFile: Option[File] = None,
                         saslProperties: Option[Properties] = None,
@@ -1086,20 +1086,22 @@ object TestUtils extends Logging {
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
       retries = 5,
-      requestTimeoutMs = 2000,
       acks = acks
     )
-
-    val values = (0 until numMessages).map(x => valueBytes match {
-      case -1 => s"test-$x".getBytes
-      case _ => new Array[Byte](valueBytes)
-    })
-
-    val futures = values.map { value =>
-      producer.send(new ProducerRecord(topic, value))
+    val values = try {
+      val curValues = (0 until numMessages).map(x => valueBytes match {
+        case -1 => s"test-$x".getBytes
+        case _ => new Array[Byte](valueBytes)
+      })
+
+      val futures = curValues.map { value =>
+        producer.send(new ProducerRecord(topic, value))
+      }
+      futures.foreach(_.get)
+      curValues 
+    } finally {
+      producer.close()
     }
-    futures.foreach(_.get)
-    producer.close()
 
     debug(s"Sent ${values.size} messages for topic [$topic]")
 
@@ -1109,8 +1111,7 @@ object TestUtils extends Logging {
   def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5,
-      requestTimeoutMs = 2000
+      retries = 5
     )
     producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
     producer.close()

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.