You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/05 21:49:07 UTC

kafka git commit: KAFKA-2401: Fix transient failure in ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread

Repository: kafka
Updated Branches:
  refs/heads/trunk fc40016cf -> 85d8218ef


KAFKA-2401: Fix transient failure in ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Guozhang

Closes #113 from becketqin/KAFKA-2401 and squashes the following commits:

7d4223d [Jiangjie Qin] KAFKA-2401: fix transient failure in ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85d8218e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85d8218e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85d8218e

Branch: refs/heads/trunk
Commit: 85d8218ef41381c58eb16d7542e41416c82bdf4a
Parents: fc40016
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Aug 5 12:50:05 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Aug 5 12:50:05 2015 -0700

----------------------------------------------------------------------
 .../kafka/api/ProducerSendTest.scala            | 25 ++++++--------------
 1 file changed, 7 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/85d8218e/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 9ce4bd5..5c6ccbc 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -372,20 +372,18 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
     var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
     try {
       // create topic
-      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
-      val leader0 = leaders(0)
-      val leader1 = leaders(1)
+      val leaders = TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      val leader = leaders(0)
 
       // create record
-      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
-      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
 
       // Test closing from sender thread.
       class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
         override def onCompletion(metadata: RecordMetadata, exception: Exception) {
           // Trigger another batch in accumulator before close the producer. These messages should
           // not be sent.
-          (0 until numRecords) map (i => producer.send(record1))
+          (0 until numRecords) map (i => producer.send(record))
           // The close call will be called by all the message callbacks. This tests idempotence of the close call.
           producer.close(0, TimeUnit.MILLISECONDS)
           // Test close with non zero timeout. Should not block at all.
@@ -395,29 +393,20 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
       for(i <- 0 until 50) {
         producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
         // send message to partition 0
-        var responses = (0 until numRecords) map (i => producer.send(record0))
-        // send message to partition 1
-        responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer))))
+        val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         // flush the messages.
         producer.flush()
         assertTrue("All request are complete.", responses.forall(_.isDone()))
         // Check the messages received by broker.
-        val fetchResponse0 = if (leader0.get == configs(0).brokerId) {
+        val fetchResponse = if (leader.get == configs(0).brokerId) {
           consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
         } else {
           consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
         }
-        val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
-          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
-        } else {
-          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
-        }
         val expectedNumRecords = (i + 1) * numRecords
         assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
-          expectedNumRecords, fetchResponse0.messageSet(topic, 0).size)
-        assertEquals("Fetch response to partition 1 should have %d messages.".format(expectedNumRecords),
-          expectedNumRecords, fetchResponse1.messageSet(topic, 1).size)
+          expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
       }
     } finally {
       if (producer != null)