You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/27 19:30:20 UTC

[kafka] branch trunk updated: MINOR: Make log cleaner tests more efficient and less flaky (#5836)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ccfcbfd  MINOR: Make log cleaner tests more efficient and less flaky (#5836)
ccfcbfd is described below

commit ccfcbfd13f6f072e8d5b00b52525ac1184a30421
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Sat Oct 27 22:30:06 2018 +0300

    MINOR: Make log cleaner tests more efficient and less flaky (#5836)
    
    `testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics` sometimes fails
    because the 15 second timeout expires. Inspecting the error message from the build
    failure, we see that this timeout happens in the writeDups() calls which call roll().
    
    ```text
    [2018-10-23 15:18:51,018] ERROR Error while flushing log for log-1 in dir /tmp/kafka-8190355063195903574 with offset 74 (kafka.server.LogDirFailureChannel:76)
    java.nio.channels.ClosedByInterruptException
    ...
    	at kafka.log.Log.roll(Log.scala:1550)
    ...
    	at kafka.log.AbstractLogCleanerIntegrationTest.writeDups(AbstractLogCleanerIntegrationTest.scala:132)
    ...
    ```
    
    After investigating, I saw that this test would call Log#roll() around 60 times every run.
    Increasing the segmentSize config to `2048` reduces the number of Log#roll() calls
    while ensuring that there are multiple rolls still.
    
    I saw that most other LogCleaner tests also call roll() ~90 times, so I've changed the
    default to be `2048`. I've also made the one test which requires a smaller segmentSize
    to set it via the args.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala | 2 +-
 .../scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 2a483fa..cc35f1d 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -42,7 +42,7 @@ abstract class AbstractLogCleanerIntegrationTest {
   private val defaultMinCleanableDirtyRatio = 0.0F
   private val defaultCompactionLag = 0L
   private val defaultDeleteDelay = 1000
-  private val defaultSegmentSize = 256
+  private val defaultSegmentSize = 2048
 
   def time: MockTime
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 232cfdb..61e3ea5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -190,10 +190,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
       return
 
     val maxMessageSize = 192
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256)
 
     val log = cleaner.logs.get(topicPartitions(0))
-    val props = logConfigProperties(maxMessageSize = maxMessageSize)
+    val props = logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256)
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
     log.config = new LogConfig(props)