You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/04 14:31:36 UTC

kafka git commit: KAFKA-5371; Increase request timeout for producer used by testReachableServer

Repository: kafka
Updated Branches:
  refs/heads/trunk 941e2177c -> 9b58372dc


KAFKA-5371; Increase request timeout for producer used by testReachableServer

500ms is low for a shared Jenkins environment.

Also removed the try/catch blocks that simply obscured
the underlying error.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Apurva Mehta <ap...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>

Closes #3225 from ijuma/kafka-5371-flaky-testReachableServer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b58372d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b58372d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b58372d

Branch: refs/heads/trunk
Commit: 9b58372dcce5dc96826d4786123513a4d8c7b39f
Parents: 941e217
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun Jun 4 15:28:06 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sun Jun 4 15:28:10 2017 +0100

----------------------------------------------------------------------
 .../unit/kafka/producer/SyncProducerTest.scala  | 41 +++++++-------------
 .../test/scala/unit/kafka/utils/TestUtils.scala |  3 +-
 2 files changed, 16 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b58372d/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 41a8a6c..cde49de 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -52,38 +52,25 @@ class SyncProducerTest extends KafkaServerTestHarness {
   @Test
   def testReachableServer() {
     val server = servers.head
-
     val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
-
     val producer = new SyncProducer(new SyncProducerConfig(props))
+
     val firstStart = Time.SYSTEM.milliseconds
-    try {
-      val response = producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-      assertNotNull(response)
-    } catch {
-      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
-    }
-    val firstEnd = Time.SYSTEM.milliseconds
-    assertTrue((firstEnd-firstStart) < 2000)
+    var response = producer.send(produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
+    assertNotNull(response)
+    assertTrue((Time.SYSTEM.milliseconds - firstStart) < 12000)
+
     val secondStart = Time.SYSTEM.milliseconds
-    try {
-      val response = producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-      assertNotNull(response)
-    } catch {
-      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
-    }
-    val secondEnd = Time.SYSTEM.milliseconds
-    assertTrue((secondEnd-secondStart) < 2000)
-    try {
-      val response = producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-      assertNotNull(response)
-    } catch {
-      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
-    }
+    response = producer.send(produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
+    assertNotNull(response)
+    assertTrue((Time.SYSTEM.milliseconds - secondStart) < 12000)
+
+    response = producer.send(produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
+    assertNotNull(response)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b58372d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a0f4762..aae58cc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -643,11 +643,12 @@ object TestUtils extends Logging {
     props
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release", "0.11.0.0")
   def getSyncProducerConfig(port: Int): Properties = {
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", port.toString)
-    props.put("request.timeout.ms", "500")
+    props.put("request.timeout.ms", "10000")
     props.put("request.required.acks", "1")
     props.put("serializer.class", classOf[StringEncoder].getName)
     props