You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/13 20:51:46 UTC
kafka git commit: KAFKA-2837: Fix transient failure of
kafka.api.ProducerBounceTest.testBrokerFailure
Repository: kafka
Updated Branches:
refs/heads/trunk fd8af25d0 -> 3fed57909
KAFKA-2837: Fix transient failure of kafka.api.ProducerBounceTest.testBrokerFailure
I can reproduced this transient failure, it seldom happen;
code is like below:
// rolling bounce brokers
for (i <- 0 until numServers) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
server.startup()
Thread.sleep(2000)
}
// Make sure the producer do not see any exception
// in returned metadata due to broker failures
assertTrue(scheduler.failed == false)
// Make sure the leader still exists after bouncing brokers
(0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition))
Brokers keep rolling restart, and producer keep sending messages;
In every loop, it will wait for election of partition leader;
But if the election is slow, more messages will be buffered in RecordAccumulator's BufferPool;
The limit for buffer is set to be 30000;
TimeoutException("Failed to allocate memory within the configured max blocking time") will show up when out of memory;
Since for every restart of the broker, it will sleep for 2000 ms, so this transient failure seldom happen;
But if I reduce the sleeping period, the bigger chance failure happens;
for example if the broker with role of controller suffered a restart, it will take time to select controller first, then select leader, which will lead to more messges blocked in KafkaProducer:RecordAccumulator:BufferPool;
In this fix, I just enlarge the producer's buffer size to be 1MB;
guozhangwang , Could you give some comments?
Author: jinxing <ji...@fenbi.com>
Author: ZoneMayor <ji...@126.com>
Reviewers: Guozhang Wang
Closes #648 from ZoneMayor/trunk-KAFKA-2837
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3fed5790
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3fed5790
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3fed5790
Branch: refs/heads/trunk
Commit: 3fed57909c16f807715f305152a5034aeb1a5532
Parents: fd8af25
Author: Jin Xing <ji...@fenbi.com>
Authored: Sun Dec 13 11:51:42 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Dec 13 11:51:42 2015 -0800
----------------------------------------------------------------------
.../scala/integration/kafka/api/ProducerBounceTest.scala | 6 +++---
.../integration/kafka/api/ProducerFailureHandlingTest.scala | 8 ++++----
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 5 ++---
3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3fed5790/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 29e146e..369c3b7 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -66,9 +66,9 @@ class ProducerBounceTest extends KafkaServerTestHarness {
override def setUp() {
super.setUp()
- producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
- producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
- producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+ producer1 = TestUtils.createNewProducer(brokerList, acks = 0, bufferSize = producerBufferSize)
+ producer2 = TestUtils.createNewProducer(brokerList, acks = 1, bufferSize = producerBufferSize)
+ producer3 = TestUtils.createNewProducer(brokerList, acks = -1, bufferSize = producerBufferSize)
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/3fed5790/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 8ba7fad..7b0910c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -63,9 +63,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override def setUp() {
super.setUp()
- producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
- producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
- producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+ producer1 = TestUtils.createNewProducer(brokerList, acks = 0, maxBlockMs = 3000L, bufferSize = producerBufferSize)
+ producer2 = TestUtils.createNewProducer(brokerList, acks = 1, maxBlockMs = 3000L, bufferSize = producerBufferSize)
+ producer3 = TestUtils.createNewProducer(brokerList, acks = -1, maxBlockMs = 3000L, bufferSize = producerBufferSize)
}
@After
@@ -134,7 +134,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
// producer with incorrect broker list
- producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+ producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 3000L, bufferSize = producerBufferSize)
// send a record with incorrect broker list
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3fed5790/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7c928c5..ecec866 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -455,8 +455,7 @@ object TestUtils extends Logging {
*/
def createNewProducer(brokerList: String,
acks: Int = -1,
- metadataFetchTimeout: Long = 3000L,
- blockOnBufferFull: Boolean = true,
+ maxBlockMs: Long = Long.MaxValue,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0,
lingerMs: Long = 0,
@@ -468,7 +467,7 @@ object TestUtils extends Logging {
val producerProps = props.getOrElse(new Properties)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
- producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
+ producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)