You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/26 00:10:28 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

junrao commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r638367249



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    deleteOldSegments(shouldDelete, RetentionSizeBreach)
+    deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
       nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
     }
 
-    deleteOldSegments(shouldDelete, StartOffsetBreach)
+    deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
    * The size of the log in bytes
    */
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
    */
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
    */
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(

Review comment:
       I feel RollAction actually makes the code harder to understand than before. So, it would be useful to think through if we could avoid it. In particular, it seems that anything in postRollAction could just be done in the caller if we return enough context. We are taking a producer snapshot in preRollAction. However, since we are not adding new data here. It seems that we could take producer snapshot in Log.roll() after calling localLog.roll() while holding the Log.lock.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    deleteOldSegments(shouldDelete, RetentionSizeBreach)
+    deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
       nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
     }
 
-    deleteOldSegments(shouldDelete, StartOffsetBreach)
+    deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
    * The size of the log in bytes
    */
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
    */
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
    */
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(
+    preRollAction = (newSegment: LogSegment) => {
+      // Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
+      // offset align with the new segment offset since this ensures we can recover the segment by beginning
+      // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
+      // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
+      // we manually override the state offset here prior to taking the snapshot.
+      producerStateManager.updateMapEndOffset(newSegment.baseOffset)
+      producerStateManager.takeSnapshot()
+    },
+    postRollAction = (newSegment: LogSegment, deletedSegment: Option[LogSegment]) => {
+      deletedSegment.foreach(segment => deleteProducerSnapshotAsync(Seq(segment)))

Review comment:
       This seems to have exposed an existing bug. During roll, deletedSegment will be non-empty if there is an existing segment of 0 size with the newOffsetToRoll. However, since we take a producer snapshot on newOffsetToRoll before calling postRollAction, we will be deleting the same snapshot we just created.
   
   In this case, I think we don't need to delete producerSnapshot for deletedSegment.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1572,144 +1414,69 @@ class Log(@volatile private var _dir: File,
         .map(_.messageOffset)
         .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE)
 
-      roll(Some(rollOffset))
+      localLog.roll(Some(rollOffset), Some(rollAction))
     } else {
       segment
     }
   }
 
   /**
-   * Roll the log over to a new active segment starting with the current logEndOffset.
+   * Roll the local log over to a new active segment starting with the current logEndOffset.
    * This will trim the index to the exact size of the number of entries it currently contains.
    *
    * @return The newly rolled segment
    */
   def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
-    maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
-      val start = time.hiResClockMs()
-      lock synchronized {
-        checkIfMemoryMappedBufferClosed()
-        val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
-        val logFile = Log.logFile(dir, newOffset)
-
-        if (segments.contains(newOffset)) {
-          // segment with the same base offset already exists and loaded
-          if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
-            // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
-            // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0).
-            warn(s"Trying to roll a new log segment with start offset $newOffset " +
-                 s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " +
-                 s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," +
-                 s" size of offset index: ${activeSegment.offsetIndex.entries}.")
-            removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, LogRoll)
-          } else {
-            throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" +
-                                     s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
-                                     s"segment is ${segments.get(newOffset)}.")
-          }
-        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
-          throw new KafkaException(
-            s"Trying to roll a new log segment for topic partition $topicPartition with " +
-            s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")
-        } else {
-          val offsetIdxFile = offsetIndexFile(dir, newOffset)
-          val timeIdxFile = timeIndexFile(dir, newOffset)
-          val txnIdxFile = transactionIndexFile(dir, newOffset)
-
-          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
-            warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first")
-            Files.delete(file.toPath)
-          }
-
-          segments.lastSegment.foreach(_.onBecomeInactiveSegment())
-        }
-
-        // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
-        // offset align with the new segment offset since this ensures we can recover the segment by beginning
-        // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
-        // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
-        // we manually override the state offset here prior to taking the snapshot.
-        producerStateManager.updateMapEndOffset(newOffset)
-        producerStateManager.takeSnapshot()
-
-        val segment = LogSegment.open(dir,
-          baseOffset = newOffset,
-          config,
-          time = time,
-          initFileSize = config.initFileSize,
-          preallocate = config.preallocate)
-        addSegment(segment)
-
-        // We need to update the segment base offset and append position data of the metadata when log rolls.
-        // The next offset should not change.
-        updateLogEndOffset(nextOffsetMetadata.messageOffset)
-
-        // schedule an asynchronous flush of the old segment
-        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-
-        info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.")
-
-        segment
-      }
+    lock synchronized {
+      localLog.roll(expectedNextOffset, Some(rollAction))
     }
   }
 
   /**
    * The number of messages appended to the log since the last flush
    */
-  private def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint
+  private def unflushedMessages: Long = logEndOffset - localLog.recoveryPoint
 
   /**
-   * Flush all log segments
+   * Flush all local log segments
    */
   def flush(): Unit = flush(this.logEndOffset)
 
   /**
-   * Flush log segments for all offsets up to offset-1
+   * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery point
    */
   def flush(offset: Long): Unit = {
     maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") {
-      if (offset > this.recoveryPoint) {
+      if (offset > localLog.recoveryPoint) {
         debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime,  current time: ${time.milliseconds()}, " +
           s"unflushed: $unflushedMessages")
-        val segments = logSegments(this.recoveryPoint, offset)
-        segments.foreach(_.flush())
-        // if there are any new segments, we need to flush the parent directory for crash consistency
-        segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))
-
+        localLog.flush(offset)
         lock synchronized {
-          checkIfMemoryMappedBufferClosed()
-          if (offset > this.recoveryPoint) {
-            this.recoveryPoint = offset
-            lastFlushedTime.set(time.milliseconds)
-          }
+          localLog.markFlushed(offset)
         }
       }
     }
   }
 
   /**
-   * Completely delete this log directory and all contents from the file system with no delay
+   * Completely delete the local log directory and all contents from the file system with no delay
    */
   private[log] def delete(): Unit = {
     maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
       lock synchronized {
-        checkIfMemoryMappedBufferClosed()
         producerExpireCheck.cancel(true)
-        removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion)
         leaderEpochCache.foreach(_.clear())
-        Utils.delete(dir)
-        // File handlers will be closed if this log is deleted
-        isMemoryMappedBufferClosed = true
+        val deletedSegments = localLog.delete()
+        deleteProducerSnapshotAsync(deletedSegments)

Review comment:
       Hmm, the ordering is a bit weird. We delete the directory in localLog.delete(). However, the producer snapshot is in the directory and is deleted later.

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+    config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (!log.isMemoryMappedBufferClosed) {
+      log.close()
+    }
+    Utils.delete(tmpDir)
+  }
+
+  case class KeyValue(key: String, value: String) {
+    def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
+      new SimpleRecord(timestamp, key.getBytes, value.getBytes)
+    }
+  }
+
+  object KeyValue {
+    def fromRecord(record: Record): KeyValue = {
+      val key =
+        if (record.hasKey)
+          StandardCharsets.UTF_8.decode(record.key()).toString
+        else
+          ""
+      val value =
+        if (record.hasValue)
+          StandardCharsets.UTF_8.decode(record.value()).toString
+        else
+          ""
+      KeyValue(key, value)
+    }
+  }
+
+  private def kvsToRecords(keyValues: Iterable[KeyValue]): Iterable[SimpleRecord] = {
+    keyValues.map(kv => kv.toRecord())
+  }
+
+  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
+    records.map(r => KeyValue.fromRecord(r))
+  }
+
+  private def appendRecords(records: Iterable[SimpleRecord],
+                            log: LocalLog = log,
+                            initialOffset: Long = 0L): Unit = {
+    log.append(lastOffset = initialOffset + records.size - 1,
+      largestTimestamp = records.head.timestamp,
+      shallowOffsetOfMaxTimestamp = initialOffset,
+      records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*))
+  }
+
+  private def readRecords(log: LocalLog = log,
+                          startOffset: Long = 0L,
+                          maxLength: => Int = log.segments.activeSegment.size,
+                          minOneMessage: Boolean = false,
+                          maxOffsetMetadata: => LogOffsetMetadata = log.logEndOffsetMetadata,
+                          includeAbortedTxns: Boolean = false): FetchDataInfo = {
+    log.read(startOffset,
+             maxLength,
+             minOneMessage = minOneMessage,
+             maxOffsetMetadata,
+             includeAbortedTxns = includeAbortedTxns)
+  }
+
+  @Test
+  def testLogDeleteSuccess(): Unit = {
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    log.roll()
+    assertEquals(2, log.segments.numberOfSegments)
+    assertFalse(logDir.listFiles.isEmpty)
+    val segmentsBeforeDelete = List[LogSegment]() ++ log.segments.values
+    val deletedSegments = log.delete()
+    assertTrue(log.segments.isEmpty)
+    assertEquals(segmentsBeforeDelete, deletedSegments)
+    assertThrows(classOf[KafkaStorageException], () => log.checkIfMemoryMappedBufferClosed())
+    assertFalse(logDir.exists)
+  }
+
+  @Test
+  def testLogDeleteFailureAfterCloseHandlers(): Unit = {
+    log.closeHandlers()
+    assertEquals(1, log.segments.numberOfSegments)
+    val segmentsBeforeDelete = log.segments.values
+    assertThrows(classOf[KafkaStorageException], () => log.delete())
+    assertEquals(1, log.segments.numberOfSegments)
+    assertEquals(segmentsBeforeDelete, log.segments.values)
+    assertTrue(logDir.exists)
+  }
+
+  @Test
+  def testUpdateConfig(): Unit = {
+    val oldConfig = log.config
+    assertEquals(oldConfig, log.config)
+
+    val newConfig = createLogConfig()

Review comment:
       It seems newConfig is always the same as oldConfig?

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+    config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (!log.isMemoryMappedBufferClosed) {
+      log.close()
+    }
+    Utils.delete(tmpDir)
+  }
+
+  case class KeyValue(key: String, value: String) {
+    def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
+      new SimpleRecord(timestamp, key.getBytes, value.getBytes)
+    }
+  }
+
+  object KeyValue {
+    def fromRecord(record: Record): KeyValue = {
+      val key =
+        if (record.hasKey)
+          StandardCharsets.UTF_8.decode(record.key()).toString
+        else
+          ""
+      val value =
+        if (record.hasValue)
+          StandardCharsets.UTF_8.decode(record.value()).toString
+        else
+          ""
+      KeyValue(key, value)
+    }
+  }
+
+  private def kvsToRecords(keyValues: Iterable[KeyValue]): Iterable[SimpleRecord] = {
+    keyValues.map(kv => kv.toRecord())
+  }
+
+  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
+    records.map(r => KeyValue.fromRecord(r))
+  }
+
+  private def appendRecords(records: Iterable[SimpleRecord],
+                            log: LocalLog = log,
+                            initialOffset: Long = 0L): Unit = {
+    log.append(lastOffset = initialOffset + records.size - 1,
+      largestTimestamp = records.head.timestamp,
+      shallowOffsetOfMaxTimestamp = initialOffset,
+      records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*))
+  }
+
+  private def readRecords(log: LocalLog = log,
+                          startOffset: Long = 0L,
+                          maxLength: => Int = log.segments.activeSegment.size,
+                          minOneMessage: Boolean = false,
+                          maxOffsetMetadata: => LogOffsetMetadata = log.logEndOffsetMetadata,
+                          includeAbortedTxns: Boolean = false): FetchDataInfo = {
+    log.read(startOffset,
+             maxLength,
+             minOneMessage = minOneMessage,
+             maxOffsetMetadata,
+             includeAbortedTxns = includeAbortedTxns)
+  }
+
+  @Test
+  def testLogDeleteSuccess(): Unit = {
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    log.roll()
+    assertEquals(2, log.segments.numberOfSegments)
+    assertFalse(logDir.listFiles.isEmpty)
+    val segmentsBeforeDelete = List[LogSegment]() ++ log.segments.values
+    val deletedSegments = log.delete()
+    assertTrue(log.segments.isEmpty)
+    assertEquals(segmentsBeforeDelete, deletedSegments)
+    assertThrows(classOf[KafkaStorageException], () => log.checkIfMemoryMappedBufferClosed())
+    assertFalse(logDir.exists)
+  }
+
+  @Test
+  def testLogDeleteFailureAfterCloseHandlers(): Unit = {
+    log.closeHandlers()
+    assertEquals(1, log.segments.numberOfSegments)
+    val segmentsBeforeDelete = log.segments.values
+    assertThrows(classOf[KafkaStorageException], () => log.delete())
+    assertEquals(1, log.segments.numberOfSegments)
+    assertEquals(segmentsBeforeDelete, log.segments.values)
+    assertTrue(logDir.exists)
+  }
+
+  @Test
+  def testUpdateConfig(): Unit = {
+    val oldConfig = log.config
+    assertEquals(oldConfig, log.config)
+
+    val newConfig = createLogConfig()
+    log.updateConfig(newConfig)
+    assertEquals(newConfig, log.config)
+  }
+
+  @Test
+  def testLogDirRenameToNewDir(): Unit = {
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    log.roll()
+    assertEquals(2, log.segments.numberOfSegments)
+    val newLogDir = TestUtils.randomPartitionLogDir(tmpDir)
+    assertTrue(log.renameDir(newLogDir.getName))
+    assertFalse(logDir.exists())
+    assertTrue(newLogDir.exists())
+    assertEquals(newLogDir, log.dir)
+    assertEquals(newLogDir.getParent, log.parentDir)
+    assertEquals(newLogDir.getParent, log.dir.getParent)
+    log.segments.values.foreach(segment => assertEquals(newLogDir.getPath, segment.log.file().getParentFile.getPath))
+    assertEquals(2, log.segments.numberOfSegments)
+  }
+
+  @Test
+  def testLogDirRenameToExistingDir(): Unit = {
+    assertFalse(log.renameDir(log.dir.getName))
+  }
+
+  @Test
+  def testLogFlush(): Unit = {
+    assertEquals(0L, log.recoveryPoint)
+    assertEquals(mockTime.milliseconds, log.lastFlushTime)
+
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    mockTime.sleep(1)
+    val newSegment = log.roll()
+    log.flush(newSegment.baseOffset)
+    log.markFlushed(newSegment.baseOffset)
+    assertEquals(1L, log.recoveryPoint)
+    assertEquals(mockTime.milliseconds, log.lastFlushTime)
+  }
+
+  @Test
+  def testLogAppend(): Unit = {
+    val fetchDataInfoBeforeAppend = readRecords(maxLength = 1)
+    assertTrue(fetchDataInfoBeforeAppend.records.records.asScala.isEmpty)
+
+    mockTime.sleep(1)
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    assertEquals(2L, log.logEndOffset)
+    assertEquals(0L, log.recoveryPoint)
+    val fetchDataInfo = readRecords()
+    assertEquals(2L, fetchDataInfo.records.records.asScala.size)
+    assertEquals(keyValues, recordsToKvs(fetchDataInfo.records.records.asScala))
+  }
+
+  @Test
+  def testLogCloseSuccess(): Unit = {
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    log.close()
+    assertThrows(classOf[ClosedChannelException], () => appendRecords(kvsToRecords(keyValues), initialOffset = 2L))
+  }
+
+  @Test
+  def testLogCloseIdempotent(): Unit = {
+    log.close()
+    // Check that LocalLog.close() is idempotent
+    log.close()
+  }
+
+  @Test
+  def testLogCloseFailureWhenInMemoryBufferClosed(): Unit = {
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    log.closeHandlers()
+    assertThrows(classOf[KafkaStorageException], () => log.close())
+  }
+
+  @Test
+  def testLogCloseHandlers(): Unit = {
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    log.closeHandlers()
+    assertThrows(classOf[ClosedChannelException],
+                 () => appendRecords(kvsToRecords(keyValues), initialOffset = 2L))
+  }
+
+  @Test
+  def testLogCloseHandlersIdempotent(): Unit = {
+    log.closeHandlers()
+    // Check that LocalLog.closeHandlers() is idempotent
+    log.closeHandlers()
+  }
+
+  private def testRemoveAndDeleteSegments(asyncDelete: Boolean): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    class TestDeletionReason extends SegmentDeletionReason {
+      private var _deletedSegments: Iterable[LogSegment] = List[LogSegment]()
+
+      override def logReason(toDelete: List[LogSegment]): Unit = {
+        _deletedSegments = List[LogSegment]() ++ toDelete
+      }
+
+      def deletedSegments: Iterable[LogSegment] = _deletedSegments
+    }
+    val reason = new TestDeletionReason()
+    val toDelete = List[LogSegment]() ++ log.segments.values
+    log.removeAndDeleteSegments(toDelete, asyncDelete = asyncDelete, reason)
+    if (asyncDelete) {
+      mockTime.sleep(log.config.fileDeleteDelayMs + 1)
+    }
+    assertTrue(log.segments.isEmpty)
+    assertEquals(toDelete, reason.deletedSegments)
+    toDelete.foreach(segment => assertTrue(segment.deleted()))
+  }
+
+  @Test
+  def testRemoveAndDeleteSegmentsSync(): Unit = {
+    testRemoveAndDeleteSegments(asyncDelete = false)
+  }
+
+  @Test
+  def testRemoveAndDeleteSegmentsAsync(): Unit = {
+    testRemoveAndDeleteSegments(asyncDelete = true)
+  }
+
+  private def testDeleteSegmentFiles(asyncDelete: Boolean): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    val toDelete = List[LogSegment]() ++ log.segments.values
+    LocalLog.deleteSegmentFiles(toDelete, asyncDelete = asyncDelete, log.dir, log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
+    if (asyncDelete) {
+      toDelete.foreach {
+        segment =>
+          assertFalse(segment.deleted())
+          assertTrue(segment.hasSuffix(LocalLog.DeletedFileSuffix))
+      }
+      mockTime.sleep(log.config.fileDeleteDelayMs + 1)
+    }
+    toDelete.foreach(segment => assertTrue(segment.deleted()))
+  }
+
+  @Test
+  def testDeleteSegmentFilesSync(): Unit = {
+    testDeleteSegmentFiles(asyncDelete = false)
+  }
+
+  @Test
+  def testDeleteSegmentFilesAsync(): Unit = {
+    testDeleteSegmentFiles(asyncDelete = true)
+  }
+
+  @Test
+  def testDeletableSegmentsFilter(): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    {
+      val deletable = log.deletableSegments(
+        (segment: LogSegment, _: Option[LogSegment], _: Long) => segment.baseOffset <= 5)
+      val expected = log.segments.nonActiveLogSegmentsFrom(0L).filter(segment => segment.baseOffset <= 5)
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val deletable = log.deletableSegments((_: LogSegment, _: Option[LogSegment], _: Long) => true)
+      val expected = log.segments.nonActiveLogSegmentsFrom(0L).toList
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = 9L)
+      val deletable = log.deletableSegments((_: LogSegment, _: Option[LogSegment], _: Long) => true)
+      val expected = log.segments.values.toList
+      assertEquals(expected, deletable.toList)
+    }
+  }
+
+  @Test
+  def testDeletableSegmentsIteration(): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    var offset = 0
+    log.deletableSegments(
+      (segment: LogSegment, nextSegmentOpt: Option[LogSegment], logEndOffset: Long) => {
+        assertEquals(offset, segment.baseOffset)
+        val floorSegmentOpt = log.segments.floorSegment(offset)
+        assertTrue(floorSegmentOpt.isDefined)
+        assertEquals(floorSegmentOpt.get, segment)
+        if (offset == log.logEndOffset) {
+          assertFalse(nextSegmentOpt.isDefined)
+        } else {
+          assertTrue(nextSegmentOpt.isDefined)
+          val higherSegmentOpt = log.segments.higherSegment(segment.baseOffset)
+          assertTrue(higherSegmentOpt.isDefined)
+          assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
+          assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
+        }
+        assertEquals(log.logEndOffset, logEndOffset)
+        offset += 1
+        true
+      })

Review comment:
       Should we assert sth after the log.deletableSegments() call?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1852,65 +1612,24 @@ class Log(@volatile private var _dir: File,
     logString.toString
   }
 
-  /**
-   * This method deletes the given log segments by doing the following for each of them:
-   * <ol>
-   *   <li>It removes the segment from the segment map so that it will no longer be used for reads.
-   *   <li>It renames the index and log files by appending .deleted to the respective file name
-   *   <li>It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously
-   * </ol>
-   * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of
-   * physically deleting a file while it is being read.
-   *
-   * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded
-   * or the immediate caller will catch and handle IOException
-   *
-   * @param segments The log segments to schedule for deletion
-   * @param asyncDelete Whether the segment files should be deleted asynchronously
-   */
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
-                                      asyncDelete: Boolean,
-                                      reason: SegmentDeletionReason): Unit = {
-    if (segments.nonEmpty) {
-      lock synchronized {
-        // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
-        // removing the deleted segment, we should force materialization of the iterator here, so that results of the
-        // iteration remain valid and deterministic.
-        val toDelete = segments.toList
-        reason.logReason(this, toDelete)
-        toDelete.foreach { segment =>
-          this.segments.remove(segment.baseOffset)
-        }
-        deleteSegmentFiles(toDelete, asyncDelete)
-      }
-    }
-  }
-
-  private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
-    Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition,
-      config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
-  }
-
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {

Review comment:
       It seems that we could remove isRecoveredSwapFile since it's always false from the caller.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1806,37 +1566,37 @@ class Log(@volatile private var _dir: File,
     endOffset: Long
   ): Unit = {
     logStartOffset = startOffset
-    nextOffsetMetadata = LogOffsetMetadata(endOffset, activeSegment.baseOffset, activeSegment.size)
-    recoveryPoint = math.min(recoveryPoint, endOffset)
-    rebuildProducerState(endOffset, producerStateManager)
+    lock synchronized {
+      rebuildProducerState(endOffset, producerStateManager)
+    }

Review comment:
       This change has a couple of issues. 
   (1) updateHighWatermark() now only updates the offset, but not the corresponding offset metadata. The offset metadata is needed in serving fetch requests. Recomputing that requires index lookup and log scan, and can be extensive. So, we need to preserve the offset metadata during truncate() and truncateFully().
   (2) I think updateHighWatermark() needs to be called within the lock. updateHighWatermark() reads local log's logEndOffset. So, we don't want the logEndOffset to change while updateHighWatermark() is called.

##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -246,17 +262,17 @@ object LogLoader extends Logging {
         return fn
       } catch {
         case e: LogSegmentOffsetOverflowException =>
-          info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
-          Log.splitOverflowedSegment(
+          info(s"${params.logIdentifier} Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
+          val result = Log.splitOverflowedSegment(
             e.segment,
             params.segments,
             params.dir,
             params.topicPartition,
             params.config,
             params.scheduler,
             params.logDirFailureChannel,
-            params.producerStateManager,
             params.logIdentifier)
+          deleteProducerSnapshotsAsync(result.deletedSegments, params)

Review comment:
       This is unnecessary since during splitting, the old segment is replaced with a new segment with the same base offset. So, result.deletedSegments is always empty.

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+    config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (!log.isMemoryMappedBufferClosed) {

Review comment:
       Should we use log.checkIfMemoryMappedBufferClosed()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org