You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/27 19:30:20 UTC
[kafka] branch trunk updated: MINOR: Make log cleaner tests more
efficient and less flaky (#5836)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ccfcbfd MINOR: Make log cleaner tests more efficient and less flaky (#5836)
ccfcbfd is described below
commit ccfcbfd13f6f072e8d5b00b52525ac1184a30421
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Sat Oct 27 22:30:06 2018 +0300
MINOR: Make log cleaner tests more efficient and less flaky (#5836)
`testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics` sometimes fails
because the 15 second timeout expires. Inspecting the error message from the build
failure, we see that this timeout happens in the writeDups() calls which call roll().
```text
[2018-10-23 15:18:51,018] ERROR Error while flushing log for log-1 in dir /tmp/kafka-8190355063195903574 with offset 74 (kafka.server.LogDirFailureChannel:76)
java.nio.channels.ClosedByInterruptException
...
at kafka.log.Log.roll(Log.scala:1550)
...
at kafka.log.AbstractLogCleanerIntegrationTest.writeDups(AbstractLogCleanerIntegrationTest.scala:132)
...
```
After investigating, I saw that this test would call Log#roll() around 60 times every run.
Increasing the segmentSize config to `2048` reduces the number of Log#roll() calls
while ensuring that there are multiple rolls still.
I saw that most other LogCleaner tests also call roll() ~90 times, so I've changed the
default to be `2048`. I've also made the one test which requires a smaller segmentSize
to set it via the args.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala | 2 +-
.../scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 2a483fa..cc35f1d 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -42,7 +42,7 @@ abstract class AbstractLogCleanerIntegrationTest {
private val defaultMinCleanableDirtyRatio = 0.0F
private val defaultCompactionLag = 0L
private val defaultDeleteDelay = 1000
- private val defaultSegmentSize = 256
+ private val defaultSegmentSize = 2048
def time: MockTime
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 232cfdb..61e3ea5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -190,10 +190,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
return
val maxMessageSize = 192
- cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
+ cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256)
val log = cleaner.logs.get(topicPartitions(0))
- val props = logConfigProperties(maxMessageSize = maxMessageSize)
+ val props = logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256)
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
log.config = new LogConfig(props)