You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/02 02:52:54 UTC

[kafka] branch 1.1 updated: KAFKA-6492; Fix log truncation to empty segment

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 1fcf67f  KAFKA-6492; Fix log truncation to empty segment
1fcf67f is described below

commit 1fcf67f5fb21999ae345ed0f6c72416a0a38ba89
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 1 11:57:52 2018 -0800

    KAFKA-6492; Fix log truncation to empty segment
    
    This patch ensures that truncation to an empty segment forces resizing of the index file in order to prevent premature rolling.
    
    I have added unit tests which verify that appends are permitted following truncation to an empty segment. Without the fix, this test case reproduces the failure in which the rolled segment matches the current active segment.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jiangjie (Becket) Qin <be...@gmail.com>
    
    Closes #4498 from hachikuji/KAFKA-6492
---
 core/src/main/scala/kafka/log/LogSegment.scala     | 15 +++--
 .../test/scala/unit/kafka/log/LogSegmentTest.scala | 64 ++++++++++++++++++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 22 ++++++++
 3 files changed, 90 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 45c820b..5970f42 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -28,7 +28,7 @@ import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.math._
@@ -345,20 +345,23 @@ class LogSegment private[log] (val log: FileRecords,
    */
   @nonthreadsafe
   def truncateTo(offset: Long): Int = {
+    // Do offset translation before truncating the index to avoid needless scanning
+    // in case we truncate the full index
     val mapping = translateOffset(offset)
-    if (mapping == null)
-      return 0
     offsetIndex.truncateTo(offset)
     timeIndex.truncateTo(offset)
     txnIndex.truncateTo(offset)
-    // after truncation, reset and allocate more space for the (new currently  active) index
+
+    // After truncation, reset and allocate more space for the (new currently active) index
     offsetIndex.resize(offsetIndex.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
-    val bytesTruncated = log.truncateTo(mapping.position)
-    if(log.sizeInBytes == 0) {
+
+    val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position)
+    if (log.sizeInBytes == 0) {
       created = time.milliseconds
       rollingBasedTimestamp = None
     }
+
     bytesSinceLastIndexEntry = 0
     if (maxTimestampSoFar >= 0)
       loadLargestTimestamp()
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 469b3cc..c45ed0d 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -36,13 +36,16 @@ class LogSegmentTest {
   var logDir: File = _
 
   /* create a segment with the given base offset */
-  def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
+  def createSegment(offset: Long,
+                    indexIntervalBytes: Int = 10,
+                    maxSegmentMs: Int = Int.MaxValue,
+                    time: Time = Time.SYSTEM): LogSegment = {
     val ms = FileRecords.open(Log.logFile(logDir, offset))
     val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
     val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
     val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
-    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = Int.MaxValue,
-      maxSegmentBytes = Int.MaxValue, Time.SYSTEM)
+    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
+      maxSegmentBytes = Int.MaxValue, time)
     segments += seg
     seg
   }
@@ -158,6 +161,47 @@ class LogSegmentTest {
   }
 
   @Test
+  def testTruncateEmptySegment() {
+    // This tests the scenario in which the follower truncates to an empty segment. In this
+    // case we must ensure that the index is resized so that the log segment is not mistakenly
+    // rolled due to a full index
+
+    val maxSegmentMs = 300000
+    val time = new MockTime
+    val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+    seg.close()
+
+    val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+    assertEquals(0, seg.timeIndex.sizeInBytes)
+    assertEquals(0, seg.offsetIndex.sizeInBytes)
+
+    time.sleep(500)
+    reopened.truncateTo(57)
+    assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
+    assertFalse(reopened.timeIndex.isFull)
+    assertFalse(reopened.offsetIndex.isFull)
+
+    assertFalse(reopened.shouldRoll(messagesSize = 1024,
+      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = 100L,
+      now = time.milliseconds()))
+
+    // The segment should not be rolled even if maxSegmentMs has been exceeded
+    time.sleep(maxSegmentMs + 1)
+    assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
+    assertFalse(reopened.shouldRoll(messagesSize = 1024,
+      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = 100L,
+      now = time.milliseconds()))
+
+    // But we should still roll the segment if we cannot fit the next offset
+    assertTrue(reopened.shouldRoll(messagesSize = 1024,
+      maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
+      maxOffsetInMessages = Int.MaxValue.toLong + 200,
+      now = time.milliseconds()))
+  }
+
+  @Test
   def testReloadLargestTimestampAndNextOffsetAfterTruncation() {
     val numMessages = 30
     val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
@@ -183,10 +227,20 @@ class LogSegmentTest {
   @Test
   def testTruncateFull() {
     // test the case where we fully truncate the log
-    val seg = createSegment(40)
+    val time = new MockTime
+    val seg = createSegment(40, time = time)
     seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+
+    // If the segment is empty after truncation, the create time should be reset
+    time.sleep(500)
+    assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
+
     seg.truncateTo(0)
+    assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
+    assertFalse(seg.timeIndex.isFull)
+    assertFalse(seg.offsetIndex.isFull)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
+
     seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2f78ec3..6753939 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -153,6 +153,28 @@ class LogTest {
   }
 
   @Test
+  def testTruncateToEmptySegment(): Unit = {
+    val log = createLog(logDir, LogConfig())
+
+    // Force a segment roll by using a large offset. The first segment will be empty
+    val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+      baseOffset = Int.MaxValue.toLong + 200)
+    appendAsFollower(log, records)
+    assertEquals(0, log.logSegments.head.size)
+    assertEquals(2, log.logSegments.size)
+
+    // Truncate to an offset before the base offset of the latest segment
+    log.truncateTo(0L)
+    assertEquals(1, log.logSegments.size)
+
+    // Now verify that we can still append to the active segment
+    appendAsFollower(log, TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
+      baseOffset = 100L))
+    assertEquals(1, log.logSegments.size)
+    assertEquals(101L, log.logEndOffset)
+  }
+
+  @Test
   def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
     // simulate the upgrade path by creating a new log with several segments, deleting the
     // snapshot files, and then reloading the log

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.