You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/12 08:37:23 UTC

[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader

kowshik commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r611433367



##########
File path: core/src/test/scala/unit/kafka/log/LogTestUtils.scala
##########
@@ -37,4 +47,246 @@ object LogTestUtils {
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
   }
+
+  def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
+                      segmentBytes: Int = Defaults.SegmentSize,
+                      retentionMs: Long = Defaults.RetentionMs,
+                      retentionBytes: Long = Defaults.RetentionSize,
+                      segmentJitterMs: Long = Defaults.SegmentJitterMs,
+                      cleanupPolicy: String = Defaults.CleanupPolicy,
+                      maxMessageBytes: Int = Defaults.MaxMessageSize,
+                      indexIntervalBytes: Int = Defaults.IndexInterval,
+                      segmentIndexBytes: Int = Defaults.MaxIndexSize,
+                      messageFormatVersion: String = Defaults.MessageFormatVersion,
+                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): LogConfig = {
+    val logProps = new Properties()
+
+    logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
+    logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
+    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
+    logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: java.lang.Long)
+    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
+    logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: java.lang.Long)
+    LogConfig(logProps)
+  }
+
+  def createLog(dir: File,
+                config: LogConfig,
+                brokerTopicStats: BrokerTopicStats,
+                scheduler: Scheduler,
+                time: Time,
+                logStartOffset: Long = 0L,
+                recoveryPoint: Long = 0L,
+                maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
+                producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
+                lastShutdownClean: Boolean = true,
+                topicId: Option[Uuid] = None): Log = {
+    Log(dir = dir,
+      config = config,
+      logStartOffset = logStartOffset,
+      recoveryPoint = recoveryPoint,
+      scheduler = scheduler,
+      brokerTopicStats = brokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = maxProducerIdExpirationMs,
+      producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10),
+      lastShutdownClean = lastShutdownClean,
+      topicId = topicId)
+  }
+
+  /**
+   * Check if the given log contains any segment with records that cause offset overflow.
+   * @param log Log to check
+   * @return true if log contains at least one segment with offset overflow; false otherwise
+   */
+  def hasOffsetOverflow(log: Log): Boolean = firstOverflowSegment(log).isDefined
+
+  def firstOverflowSegment(log: Log): Option[LogSegment] = {
+    def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean =
+      batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset < baseOffset
+
+    for (segment <- log.logSegments) {
+      val overflowBatch = segment.log.batches.asScala.find(batch => hasOverflow(segment.baseOffset, batch))
+      if (overflowBatch.isDefined)
+        return Some(segment)
+    }
+    None
+  }
+
+  private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
+    FileRecords.open(Log.logFile(logDir, baseOffset))
+
+  /**
+   * Initialize the given log directory with a set of segments, one of which will have an
+   * offset which overflows the segment
+   */
+  def initializeLogDirWithOverflowedSegment(logDir: File): Unit = {
+    def writeSampleBatches(baseOffset: Long, segment: FileRecords): Long = {
+      def record(offset: Long) = {
+        val data = offset.toString.getBytes
+        new SimpleRecord(data, data)
+      }
+
+      segment.append(MemoryRecords.withRecords(baseOffset, CompressionType.NONE, 0,
+        record(baseOffset)))
+      segment.append(MemoryRecords.withRecords(baseOffset + 1, CompressionType.NONE, 0,
+        record(baseOffset + 1),
+        record(baseOffset + 2)))
+      segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, CompressionType.NONE, 0,
+        record(baseOffset + Int.MaxValue - 1)))
+      // Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment.
+      Log.offsetIndexFile(logDir, baseOffset).createNewFile()
+      Log.timeIndexFile(logDir, baseOffset).createNewFile()
+      baseOffset + Int.MaxValue
+    }
+
+    def writeNormalSegment(baseOffset: Long): Long = {
+      val segment = rawSegment(logDir, baseOffset)
+      try writeSampleBatches(baseOffset, segment)
+      finally segment.close()
+    }
+
+    def writeOverflowSegment(baseOffset: Long): Long = {
+      val segment = rawSegment(logDir, baseOffset)
+      try {
+        val nextOffset = writeSampleBatches(baseOffset, segment)
+        writeSampleBatches(nextOffset, segment)
+      } finally segment.close()
+    }
+
+    // We create three segments, the second of which contains offsets which overflow
+    var nextOffset = 0L
+    nextOffset = writeNormalSegment(nextOffset)
+    nextOffset = writeOverflowSegment(nextOffset)
+    writeNormalSegment(nextOffset)
+  }
+
+  def allRecords(log: Log): List[Record] = {
+    val recordsFound = ListBuffer[Record]()
+    for (logSegment <- log.logSegments) {
+      for (batch <- logSegment.log.batches.asScala) {
+        recordsFound ++= batch.iterator().asScala
+      }
+    }
+    recordsFound.toList
+  }
+
+  def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = {

Review comment:
       Good catch. It was unused and I've eliminated it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org