You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/02/13 08:16:59 UTC
[kafka] branch trunk updated: MINOR: Remove unnecessary usage of `LazyIndex` (#13218)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 959756ae9d1 MINOR: Remove unnecessary usage of `LazyIndex` (#13218)
959756ae9d1 is described below
commit 959756ae9d14f434109525aa43513358a27bf75c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Feb 13 00:16:37 2023 -0800
MINOR: Remove unnecessary usage of `LazyIndex` (#13218)
The remote index classes use `LazyIndex`, but immediately
force materialization. This results in more verbose code
and it's misleading since the indexes are not lazily
used in practice.
Also simplify `LazyIndex.forOffset/forTime` by removing
`writable` parameter, which is always `true`.
Reviewers: Satish Duggana <sa...@apache.org>
---
core/src/main/scala/kafka/log/LogSegment.scala | 4 +-
.../scala/kafka/log/remote/RemoteIndexCache.scala | 43 +++++++++-------------
.../test/scala/unit/kafka/log/LogTestUtils.scala | 4 +-
.../kafka/log/remote/RemoteIndexCacheTest.scala | 4 +-
.../kafka/storage/internals/log/LazyIndex.java | 17 ++++-----
5 files changed, 30 insertions(+), 42 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index e5fe0dfd585..557c3b5593d 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -672,8 +672,8 @@ object LogSegment {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(UnifiedLog.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
- LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize, true),
- LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize, true),
+ LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
+ LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
new TransactionIndex(baseOffset, UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)),
baseOffset,
indexIntervalBytes = config.indexInterval,
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
index 8fa07cf2ae8..c3085d53a4a 100644
--- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
+++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
-import org.apache.kafka.storage.internals.log.{LazyIndex, LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
import java.io.{Closeable, File, InputStream}
import java.nio.file.{Files, Path}
@@ -37,14 +37,14 @@ object RemoteIndexCache {
val TmpFileSuffix = ".tmp"
}
-class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[TimeIndex], val txnIndex: TransactionIndex) {
+class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) {
private var markedForCleanup: Boolean = false
private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
def lookupOffset(targetOffset: Long): OffsetPosition = {
CoreUtils.inLock(lock.readLock()) {
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
- else offsetIndex.get.lookup(targetOffset)
+ else offsetIndex.lookup(targetOffset)
}
}
@@ -52,8 +52,8 @@ class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[Ti
CoreUtils.inLock(lock.readLock()) {
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
- val timestampOffset = timeIndex.get.lookup(timestamp)
- offsetIndex.get.lookup(math.max(startingOffset, timestampOffset.offset))
+ val timestampOffset = timeIndex.lookup(timestamp)
+ offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
}
}
@@ -143,23 +143,14 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
if (offsetIndexFile.exists() && timestampIndexFile.exists() && txnIndexFile.exists()) {
- val offsetIndex: LazyIndex[OffsetIndex] = {
- val index = LazyIndex.forOffset(offsetIndexFile, offset, Int.MaxValue, false)
- index.get.sanityCheck()
- index
- }
+ val offsetIndex = new OffsetIndex(offsetIndexFile, offset, Int.MaxValue, false)
+ offsetIndex.sanityCheck()
- val timeIndex: LazyIndex[TimeIndex] = {
- val index = LazyIndex.forTime(timestampIndexFile, offset, Int.MaxValue, false)
- index.get.sanityCheck()
- index
- }
+ val timeIndex = new TimeIndex(timestampIndexFile, offset, Int.MaxValue, false)
+ timeIndex.sanityCheck()
- val txnIndex: TransactionIndex = {
- val index = new TransactionIndex(offset, txnIndexFile)
- index.sanityCheck()
- index
- }
+ val txnIndex = new TransactionIndex(offset, txnIndexFile)
+ txnIndex.sanityCheck()
val entry = new Entry(offsetIndex, timeIndex, txnIndex)
entries.put(uuid, entry)
@@ -238,19 +229,19 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM
// uuid.toString uses URL encoding which is safe for filenames and URLs.
val fileName = startOffset.toString + "_" + uuid.toString + "_"
- val offsetIndex: LazyIndex[OffsetIndex] = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix,
+ val offsetIndex: OffsetIndex = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix,
rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET),
file => {
- val index = LazyIndex.forOffset(file, startOffset, Int.MaxValue, false)
- index.get.sanityCheck()
+ val index = new OffsetIndex(file, startOffset, Int.MaxValue, false)
+ index.sanityCheck()
index
})
- val timeIndex: LazyIndex[TimeIndex] = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix,
+ val timeIndex: TimeIndex = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix,
rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP),
file => {
- val index = LazyIndex.forTime(file, startOffset, Int.MaxValue, false)
- index.get.sanityCheck()
+ val index = new TimeIndex(file, startOffset, Int.MaxValue, false)
+ index.sanityCheck()
index
})
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 79aab9cd486..c69f8ab5bb1 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -46,8 +46,8 @@ object LogTestUtils {
indexIntervalBytes: Int = 10,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(UnifiedLog.logFile(logDir, offset))
- val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, 1000, true)
- val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, 1500, true)
+ val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, 1000)
+ val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, 1500)
val txnIndex = new TransactionIndex(offset, UnifiedLog.transactionIndexFile(logDir, offset))
new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 0129d67b944..89afa848ddd 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -86,7 +86,7 @@ class RemoteIndexCacheTest {
@Test
def testFetchIndexFromRemoteStorage(): Unit = {
- val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+ val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
val offsetPosition1 = offsetIndex.entry(1)
// this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
@@ -104,7 +104,7 @@ class RemoteIndexCacheTest {
@Test
def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
- val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+ val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset)
val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset))
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
index b1e16a8f67b..1172bb596e7 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
@@ -141,26 +141,23 @@ public class LazyIndex<T extends AbstractIndex> {
private final Lock lock = new ReentrantLock();
private final long baseOffset;
private final int maxIndexSize;
- private final boolean writable;
private final IndexType indexType;
private volatile IndexWrapper indexWrapper;
- private LazyIndex(IndexWrapper indexWrapper, long baseOffset, int maxIndexSize, boolean writable,
- IndexType indexType) {
+ private LazyIndex(IndexWrapper indexWrapper, long baseOffset, int maxIndexSize, IndexType indexType) {
this.indexWrapper = indexWrapper;
this.baseOffset = baseOffset;
this.maxIndexSize = maxIndexSize;
- this.writable = writable;
this.indexType = indexType;
}
- public static LazyIndex<OffsetIndex> forOffset(File file, long baseOffset, int maxIndexSize, boolean writable) {
- return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, writable, IndexType.OFFSET);
+ public static LazyIndex<OffsetIndex> forOffset(File file, long baseOffset, int maxIndexSize) {
+ return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, IndexType.OFFSET);
}
- public static LazyIndex<TimeIndex> forTime(File file, long baseOffset, int maxIndexSize, boolean writable) {
- return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, writable, IndexType.TIME);
+ public static LazyIndex<TimeIndex> forTime(File file, long baseOffset, int maxIndexSize) {
+ return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, IndexType.TIME);
}
public File file() {
@@ -234,9 +231,9 @@ public class LazyIndex<T extends AbstractIndex> {
private T loadIndex(File file) throws IOException {
switch (indexType) {
case OFFSET:
- return (T) new OffsetIndex(file, baseOffset, maxIndexSize, writable);
+ return (T) new OffsetIndex(file, baseOffset, maxIndexSize, true);
case TIME:
- return (T) new TimeIndex(file, baseOffset, maxIndexSize, writable);
+ return (T) new TimeIndex(file, baseOffset, maxIndexSize, true);
default:
throw new IllegalStateException("Unexpected indexType " + indexType);
}