You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/04 14:31:36 UTC
kafka git commit: KAFKA-5371;
Increase request timeout for producer used by testReachableServer
Repository: kafka
Updated Branches:
refs/heads/trunk 941e2177c -> 9b58372dc
KAFKA-5371; Increase request timeout for producer used by testReachableServer
500ms is low for a shared Jenkins environment.
Also removed the try/catch blocks that simply obscured
the underlying error.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Apurva Mehta <ap...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
Closes #3225 from ijuma/kafka-5371-flaky-testReachableServer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b58372d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b58372d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b58372d
Branch: refs/heads/trunk
Commit: 9b58372dcce5dc96826d4786123513a4d8c7b39f
Parents: 941e217
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun Jun 4 15:28:06 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sun Jun 4 15:28:10 2017 +0100
----------------------------------------------------------------------
.../unit/kafka/producer/SyncProducerTest.scala | 41 +++++++-------------
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
2 files changed, 16 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b58372d/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 41a8a6c..cde49de 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -52,38 +52,25 @@ class SyncProducerTest extends KafkaServerTestHarness {
@Test
def testReachableServer() {
val server = servers.head
-
val props = TestUtils.getSyncProducerConfig(boundPort(server))
-
val producer = new SyncProducer(new SyncProducerConfig(props))
+
val firstStart = Time.SYSTEM.milliseconds
- try {
- val response = producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
- assertNotNull(response)
- } catch {
- case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
- }
- val firstEnd = Time.SYSTEM.milliseconds
- assertTrue((firstEnd-firstStart) < 2000)
+ var response = producer.send(produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
+ assertNotNull(response)
+ assertTrue((Time.SYSTEM.milliseconds - firstStart) < 12000)
+
val secondStart = Time.SYSTEM.milliseconds
- try {
- val response = producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
- assertNotNull(response)
- } catch {
- case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
- }
- val secondEnd = Time.SYSTEM.milliseconds
- assertTrue((secondEnd-secondStart) < 2000)
- try {
- val response = producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
- assertNotNull(response)
- } catch {
- case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
- }
+ response = producer.send(produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
+ assertNotNull(response)
+ assertTrue((Time.SYSTEM.milliseconds - secondStart) < 12000)
+
+ response = producer.send(produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
+ assertNotNull(response)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b58372d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a0f4762..aae58cc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -643,11 +643,12 @@ object TestUtils extends Logging {
props
}
+ @deprecated("This method has been deprecated and will be removed in a future release", "0.11.0.0")
def getSyncProducerConfig(port: Int): Properties = {
val props = new Properties()
props.put("host", "localhost")
props.put("port", port.toString)
- props.put("request.timeout.ms", "500")
+ props.put("request.timeout.ms", "10000")
props.put("request.required.acks", "1")
props.put("serializer.class", classOf[StringEncoder].getName)
props