You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/03 20:05:40 UTC

[kafka] branch trunk updated: KAFKA-6854; Handle batches deleted during log cleaning of logs with txns (#4962)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6dbd9b5  KAFKA-6854; Handle batches deleted during log cleaning of logs with txns (#4962)
6dbd9b5 is described below

commit 6dbd9b59e60eff821fe677af1c6ed33b96243153
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu May 3 21:05:36 2018 +0100

    KAFKA-6854; Handle batches deleted during log cleaning of logs with txns (#4962)
    
    Log cleaner grows buffers when result.messagesRead is zero. This contains the number of filtered messages read from source which can be zero when transactions are used because batches may be discarded. Log cleaner incorrectly assumes that messages were not read because the buffer was too small and attempts to double the buffer size unnecessarily, failing with an exception if the buffer is already max.message.bytes. Additional check for discarded batches has been added to avoid growing [...]
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/common/record/MemoryRecords.java  |  4 ++-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  5 ++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 33 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index da6b68c..ea6aa4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -145,7 +145,7 @@ public class MemoryRecords extends AbstractRecords {
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
         int messagesRead = 0;
-        int bytesRead = 0;
+        int bytesRead = 0; // bytes processed from `batches`
         int messagesRetained = 0;
         int bytesRetained = 0;
 
@@ -359,6 +359,8 @@ public class MemoryRecords extends AbstractRecords {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;
 
+        // Note that `bytesRead` should contain only bytes from batches that have been processed,
+        // i.e. bytes from `messagesRead` and any discarded batches.
         public FilterResult(ByteBuffer output,
                             int messagesRead,
                             int bytesRead,
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 2e32250..ee31274 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -620,8 +620,9 @@ private[log] class Cleaner(val id: Int,
         throttler.maybeThrottle(outputBuffer.limit())
       }
 
-      // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
-      if (readBuffer.limit() > 0 && result.messagesRead == 0)
+      // if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again
+      // `result.bytesRead` contains bytes from the `messagesRead` and any discarded batches.
+      if (readBuffer.limit() > 0 && result.bytesRead == 0)
         growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 906c26d..edc1744 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -300,6 +300,39 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
   }
 
+  /**
+   * Tests log cleaning with batches that are deleted where no additional messages
+   * are available to read in the buffer. Cleaning should continue from the next offset.
+   */
+  @Test
+  def testDeletedBatchesWithNoMessagesRead(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MaxMessageBytesProp, 100: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+
+    appendProducer(Seq(1))
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    appendProducer(Seq(2))
+    appendProducer(Seq(2))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    log.roll()
+
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(2), keysInLog(log))
+    assertEquals(List(1, 3, 4), offsetsInLog(log))
+
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(2), keysInLog(log))
+    assertEquals(List(3, 4), offsetsInLog(log))
+  }
+
   @Test
   def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
     val tp = new TopicPartition("test", 0)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.