You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/13 20:51:46 UTC

kafka git commit: KAFKA-2837: Fix transient failure of kafka.api.ProducerBounceTest.testBrokerFailure

Repository: kafka
Updated Branches:
  refs/heads/trunk fd8af25d0 -> 3fed57909


KAFKA-2837: Fix transient failure of kafka.api.ProducerBounceTest.testBrokerFailure

I can reproduced this transient failure, it seldom happen;
code is like below:
 // rolling bounce brokers
    for (i <- 0 until numServers) {
      for (server <- servers) {
        server.shutdown()
        server.awaitShutdown()
        server.startup()
        Thread.sleep(2000)
      }

      // Make sure the producer do not see any exception
      // in returned metadata due to broker failures
      assertTrue(scheduler.failed == false)

      // Make sure the leader still exists after bouncing brokers
      (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition))
Brokers keep rolling restart, and producer keep sending messages;
In every loop, it will wait for election of partition leader;
But if the election is slow, more messages will be buffered in RecordAccumulator's BufferPool;
The limit for buffer is set to be 30000;
TimeoutException("Failed to allocate memory within the configured max blocking time") will show up when out of memory;
Since for every restart of the broker, it will sleep for 2000 ms,  so this transient failure seldom happen;
But if I reduce the sleeping period, the bigger chance failure happens;
for example if the broker with role of controller suffered a restart, it will take time to select controller first, then select leader, which will lead to more messges blocked in KafkaProducer:RecordAccumulator:BufferPool;
In this fix, I just enlarge the producer's buffer size to be 1MB;
guozhangwang , Could you give some comments?

Author: jinxing <ji...@fenbi.com>
Author: ZoneMayor <ji...@126.com>

Reviewers: Guozhang Wang

Closes #648 from ZoneMayor/trunk-KAFKA-2837


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3fed5790
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3fed5790
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3fed5790

Branch: refs/heads/trunk
Commit: 3fed57909c16f807715f305152a5034aeb1a5532
Parents: fd8af25
Author: Jin Xing <ji...@fenbi.com>
Authored: Sun Dec 13 11:51:42 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Dec 13 11:51:42 2015 -0800

----------------------------------------------------------------------
 .../scala/integration/kafka/api/ProducerBounceTest.scala     | 6 +++---
 .../integration/kafka/api/ProducerFailureHandlingTest.scala  | 8 ++++----
 core/src/test/scala/unit/kafka/utils/TestUtils.scala         | 5 ++---
 3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3fed5790/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 29e146e..369c3b7 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -66,9 +66,9 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
-    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
-    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, bufferSize = producerBufferSize)
+    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, bufferSize = producerBufferSize)
+    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, bufferSize = producerBufferSize)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/3fed5790/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 8ba7fad..7b0910c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -63,9 +63,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
-    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
-    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, maxBlockMs = 3000L, bufferSize = producerBufferSize)
+    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, maxBlockMs = 3000L, bufferSize = producerBufferSize)
+    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, maxBlockMs = 3000L, bufferSize = producerBufferSize)
   }
 
   @After
@@ -134,7 +134,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // producer with incorrect broker list
-    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 3000L, bufferSize = producerBufferSize)
 
     // send a record with incorrect broker list
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3fed5790/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7c928c5..ecec866 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -455,8 +455,7 @@ object TestUtils extends Logging {
    */
   def createNewProducer(brokerList: String,
                         acks: Int = -1,
-                        metadataFetchTimeout: Long = 3000L,
-                        blockOnBufferFull: Boolean = true,
+                        maxBlockMs: Long = Long.MaxValue,
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
                         lingerMs: Long = 0,
@@ -468,7 +467,7 @@ object TestUtils extends Logging {
     val producerProps = props.getOrElse(new Properties)
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
-    producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
+    producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)