You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/02/13 08:16:59 UTC

[kafka] branch trunk updated: MINOR: Remove unnecessary usage of `LazyIndex` (#13218)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 959756ae9d1 MINOR: Remove unnecessary usage of `LazyIndex` (#13218)
959756ae9d1 is described below

commit 959756ae9d14f434109525aa43513358a27bf75c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Feb 13 00:16:37 2023 -0800

    MINOR: Remove unnecessary usage of `LazyIndex` (#13218)
    
    The remote index classes use `LazyIndex`, but immediately
    force materialization. This results in more verbose code
    and it's misleading since the indexes are not lazily
    used in practice.
    
    Also simplify `LazyIndex.forOffset/forTime` by removing
    `writable` parameter, which is always `true`.
    
    Reviewers: Satish Duggana <sa...@apache.org>
---
 core/src/main/scala/kafka/log/LogSegment.scala     |  4 +-
 .../scala/kafka/log/remote/RemoteIndexCache.scala  | 43 +++++++++-------------
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |  4 +-
 .../kafka/log/remote/RemoteIndexCacheTest.scala    |  4 +-
 .../kafka/storage/internals/log/LazyIndex.java     | 17 ++++-----
 5 files changed, 30 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index e5fe0dfd585..557c3b5593d 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -672,8 +672,8 @@ object LogSegment {
     val maxIndexSize = config.maxIndexSize
     new LogSegment(
       FileRecords.open(UnifiedLog.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
-      LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize, true),
-      LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize, true),
+      LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
+      LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
       new TransactionIndex(baseOffset, UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)),
       baseOffset,
       indexIntervalBytes = config.indexInterval,
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
index 8fa07cf2ae8..c3085d53a4a 100644
--- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
+++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
-import org.apache.kafka.storage.internals.log.{LazyIndex, LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
 
 import java.io.{Closeable, File, InputStream}
 import java.nio.file.{Files, Path}
@@ -37,14 +37,14 @@ object RemoteIndexCache {
   val TmpFileSuffix = ".tmp"
 }
 
-class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[TimeIndex], val txnIndex: TransactionIndex) {
+class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) {
   private var markedForCleanup: Boolean = false
   private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   def lookupOffset(targetOffset: Long): OffsetPosition = {
     CoreUtils.inLock(lock.readLock()) {
       if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
-      else offsetIndex.get.lookup(targetOffset)
+      else offsetIndex.lookup(targetOffset)
     }
   }
 
@@ -52,8 +52,8 @@ class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[Ti
     CoreUtils.inLock(lock.readLock()) {
       if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
 
-      val timestampOffset = timeIndex.get.lookup(timestamp)
-      offsetIndex.get.lookup(math.max(startingOffset, timestampOffset.offset))
+      val timestampOffset = timeIndex.lookup(timestamp)
+      offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
     }
   }
 
@@ -143,23 +143,14 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
 
         if (offsetIndexFile.exists() && timestampIndexFile.exists() && txnIndexFile.exists()) {
 
-          val offsetIndex: LazyIndex[OffsetIndex] = {
-            val index = LazyIndex.forOffset(offsetIndexFile, offset, Int.MaxValue, false)
-            index.get.sanityCheck()
-            index
-          }
+          val offsetIndex = new OffsetIndex(offsetIndexFile, offset, Int.MaxValue, false)
+          offsetIndex.sanityCheck()
 
-          val timeIndex: LazyIndex[TimeIndex] = {
-            val index = LazyIndex.forTime(timestampIndexFile, offset, Int.MaxValue, false)
-            index.get.sanityCheck()
-            index
-          }
+          val timeIndex = new TimeIndex(timestampIndexFile, offset, Int.MaxValue, false)
+          timeIndex.sanityCheck()
 
-          val txnIndex: TransactionIndex = {
-            val index = new TransactionIndex(offset, txnIndexFile)
-            index.sanityCheck()
-            index
-          }
+          val txnIndex = new TransactionIndex(offset, txnIndexFile)
+          txnIndex.sanityCheck()
 
           val entry = new Entry(offsetIndex, timeIndex, txnIndex)
           entries.put(uuid, entry)
@@ -238,19 +229,19 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
         // uuid.toString uses URL encoding which is safe for filenames and URLs.
         val fileName = startOffset.toString + "_" + uuid.toString + "_"
 
-        val offsetIndex: LazyIndex[OffsetIndex] = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix,
+        val offsetIndex: OffsetIndex = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix,
           rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET),
           file => {
-            val index = LazyIndex.forOffset(file, startOffset, Int.MaxValue, false)
-            index.get.sanityCheck()
+            val index = new OffsetIndex(file, startOffset, Int.MaxValue, false)
+            index.sanityCheck()
             index
           })
 
-        val timeIndex: LazyIndex[TimeIndex] = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix,
+        val timeIndex: TimeIndex = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix,
           rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP),
           file => {
-            val index = LazyIndex.forTime(file, startOffset, Int.MaxValue, false)
-            index.get.sanityCheck()
+            val index = new TimeIndex(file, startOffset, Int.MaxValue, false)
+            index.sanityCheck()
             index
           })
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 79aab9cd486..c69f8ab5bb1 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -46,8 +46,8 @@ object LogTestUtils {
                     indexIntervalBytes: Int = 10,
                     time: Time = Time.SYSTEM): LogSegment = {
     val ms = FileRecords.open(UnifiedLog.logFile(logDir, offset))
-    val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, 1000, true)
-    val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, 1500, true)
+    val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, 1000)
+    val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, 1500)
     val txnIndex = new TransactionIndex(offset, UnifiedLog.transactionIndexFile(logDir, offset))
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 0129d67b944..89afa848ddd 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -86,7 +86,7 @@ class RemoteIndexCacheTest {
 
   @Test
   def testFetchIndexFromRemoteStorage(): Unit = {
-    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
     val offsetPosition1 = offsetIndex.entry(1)
     // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
     val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
@@ -104,7 +104,7 @@ class RemoteIndexCacheTest {
 
   @Test
   def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
-    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
     val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset)
     val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
     assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset))
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
index b1e16a8f67b..1172bb596e7 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
@@ -141,26 +141,23 @@ public class LazyIndex<T extends AbstractIndex> {
     private final Lock lock = new ReentrantLock();
     private final long baseOffset;
     private final int maxIndexSize;
-    private final boolean writable;
     private final IndexType indexType;
 
     private volatile IndexWrapper indexWrapper;
 
-    private LazyIndex(IndexWrapper indexWrapper, long baseOffset, int maxIndexSize, boolean writable,
-                      IndexType indexType) {
+    private LazyIndex(IndexWrapper indexWrapper, long baseOffset, int maxIndexSize, IndexType indexType) {
         this.indexWrapper = indexWrapper;
         this.baseOffset = baseOffset;
         this.maxIndexSize = maxIndexSize;
-        this.writable = writable;
         this.indexType = indexType;
     }
 
-    public static LazyIndex<OffsetIndex> forOffset(File file, long baseOffset, int maxIndexSize, boolean writable) {
-        return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, writable, IndexType.OFFSET);
+    public static LazyIndex<OffsetIndex> forOffset(File file, long baseOffset, int maxIndexSize) {
+        return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, IndexType.OFFSET);
     }
 
-    public static LazyIndex<TimeIndex> forTime(File file, long baseOffset, int maxIndexSize, boolean writable) {
-        return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, writable, IndexType.TIME);
+    public static LazyIndex<TimeIndex> forTime(File file, long baseOffset, int maxIndexSize) {
+        return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, IndexType.TIME);
     }
 
     public File file() {
@@ -234,9 +231,9 @@ public class LazyIndex<T extends AbstractIndex> {
     private T loadIndex(File file) throws IOException {
         switch (indexType) {
             case OFFSET:
-                return (T) new OffsetIndex(file, baseOffset, maxIndexSize, writable);
+                return (T) new OffsetIndex(file, baseOffset, maxIndexSize, true);
             case TIME:
-                return (T) new TimeIndex(file, baseOffset, maxIndexSize, writable);
+                return (T) new TimeIndex(file, baseOffset, maxIndexSize, true);
             default:
                 throw new IllegalStateException("Unexpected indexType " + indexType);
         }