You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/05 21:49:07 UTC
kafka git commit: KAFKA-2401: Fix transient failure in
ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread
Repository: kafka
Updated Branches:
refs/heads/trunk fc40016cf -> 85d8218ef
KAFKA-2401: Fix transient failure in ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Guozhang
Closes #113 from becketqin/KAFKA-2401 and squashes the following commits:
7d4223d [Jiangjie Qin] KAFKA-2401: fix transient failure in ProducerSendTest.testCloseWithZeroTimeoutFromSenderThread
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85d8218e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85d8218e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85d8218e
Branch: refs/heads/trunk
Commit: 85d8218ef41381c58eb16d7542e41416c82bdf4a
Parents: fc40016
Author: Jiangjie Qin <be...@gmail.com>
Authored: Wed Aug 5 12:50:05 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Aug 5 12:50:05 2015 -0700
----------------------------------------------------------------------
.../kafka/api/ProducerSendTest.scala | 25 ++++++--------------
1 file changed, 7 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/85d8218e/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 9ce4bd5..5c6ccbc 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -372,20 +372,18 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
try {
// create topic
- val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
- val leader0 = leaders(0)
- val leader1 = leaders(1)
+ val leaders = TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ val leader = leaders(0)
// create record
- val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
- val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
// Test closing from sender thread.
class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception) {
// Trigger another batch in accumulator before close the producer. These messages should
// not be sent.
- (0 until numRecords) map (i => producer.send(record1))
+ (0 until numRecords) map (i => producer.send(record))
// The close call will be called by all the message callbacks. This tests idempotence of the close call.
producer.close(0, TimeUnit.MILLISECONDS)
// Test close with non zero timeout. Should not block at all.
@@ -395,29 +393,20 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
for(i <- 0 until 50) {
producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
// send message to partition 0
- var responses = (0 until numRecords) map (i => producer.send(record0))
- // send message to partition 1
- responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer))))
+ val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
// flush the messages.
producer.flush()
assertTrue("All request are complete.", responses.forall(_.isDone()))
// Check the messages received by broker.
- val fetchResponse0 = if (leader0.get == configs(0).brokerId) {
+ val fetchResponse = if (leader.get == configs(0).brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
}
- val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
- consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
- } else {
- consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
- }
val expectedNumRecords = (i + 1) * numRecords
assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
- expectedNumRecords, fetchResponse0.messageSet(topic, 0).size)
- assertEquals("Fetch response to partition 1 should have %d messages.".format(expectedNumRecords),
- expectedNumRecords, fetchResponse1.messageSet(topic, 1).size)
+ expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
}
} finally {
if (producer != null)