You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/10/11 02:58:26 UTC

[kafka] branch trunk updated: KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 99ce2e081c5 KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)
99ce2e081c5 is described below

commit 99ce2e081c575eeb6f02a046e3b6530ca6402ad2
Author: Arpit Goyal <59...@users.noreply.github.com>
AuthorDate: Wed Oct 11 08:28:17 2023 +0530

    KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)
    
    est Cases Covered
    
        1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
        2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
        3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>, Kamal Chandraprakash<ka...@gmail.com>
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 275 +++++++++++++++++++--
 .../storage/internals/log/RemoteIndexCache.java    |   4 +-
 2 files changed, 260 insertions(+), 19 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index b9482217062..07f07757b31 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -17,25 +17,28 @@
 package kafka.log.remote
 
 import kafka.utils.TestUtils
+import kafka.utils.TestUtils.waitUntilTrue
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, FileInputStream, IOException, PrintWriter}
-import java.nio.file.Files
+import java.io.{File, FileInputStream, FileNotFoundException, IOException, PrintWriter}
+import java.nio.file.{Files, Paths}
 import java.util
-import java.util.Collections
+import java.util.{Collections, Optional}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import scala.collection.mutable
 
@@ -516,10 +519,14 @@ class RemoteIndexCacheTest {
   def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
 
     def getIndexFileFromRemoteCacheDir(suffix: String) = {
-      Files.walk(cache.cacheDir())
-        .filter(Files.isRegularFile(_))
-        .filter(path => path.getFileName.toString.endsWith(suffix))
-        .findAny()
+      try {
+        Files.walk(cache.cacheDir().toPath())
+          .filter(Files.isRegularFile(_))
+          .filter(path => path.getFileName.toString.endsWith(suffix))
+          .findAny()
+      } catch {
+        case _: FileNotFoundException => Optional.empty()
+      }
     }
 
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
@@ -554,23 +561,223 @@ class RemoteIndexCacheTest {
     assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = {
+    // create Corrupted Index File in remote index cache
+    createCorruptedIndexFile(indexType, cache.cacheDir())
     val entry = cache.getIndexEntry(rlsMetadata)
-    // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
+    // Test would fail if it throws Exception other than CorruptIndexException
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=$offsetIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupted index file
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
+    assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // Current status
+    // (cache is null)
+    // RemoteCacheDir contain
+    // 1. Offset Index File is fine and not corrupted
+    // 2. Time Index File is corrupted
+    // What should be the code flow in next execution
+    // 1. No rsm call for fetching OffSet Index File.
+    // 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution.
+    // 3. Transaction index file should be fetched from remote storage.
+    reset(rsm)
+    // delete all files created in tpDir
+    Files.walk(tpDir.toPath, 1)
+      .filter(Files.isRegularFile(_))
+      .forEach(path => Files.deleteIfExists(path))
+    // rsm should return no corrupted file in the 2nd execution
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+    cache.getIndexEntry(rlsMetadata)
+    // rsm should not be called to fetch offset Index
+    verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
+    verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
+    // Transaction index would be fetched again
+    // as previous getIndexEntry failed before fetchTransactionIndex
+    verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))
+  }
+
+  @Test
+  def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = {
+    val remoteIndexCacheDir = cache.cacheDir()
+    val tempSuffix = ".tmptest"
+
+    def getRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix))))
+    }
+
+    val entry = cache.getIndexEntry(rlsMetadata)
+    verifyFetchIndexInvocation(count = 1)
+    // copy files with temporary name
+    Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix)))
+    Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)))
+    Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)))
+
+    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after invalidation")
+    TestUtils.waitUntilTrue(() => entry.isCleanStarted,
+      "Failed to cleanup cache entry after invalidation")
+
+    // restore index files
+    renameRemoteCacheIndexFileFromDisk(tempSuffix)
+    // validate cache entry for the above key should be null
+    assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    cache.getIndexEntry(rlsMetadata)
+    // Index  Files already exist ,rsm should not fetch them again.
+    verifyFetchIndexInvocation(count = 1)
+    // verify index files on disk
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
+  def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = {
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file return from RSM
+        createCorruptedIndexFile(testIndexType, tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+    assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
+  }
+
+  @Test
+  def testConcurrentCacheDeletedFileExists(): Unit = {
+    val remoteIndexCacheDir = cache.cacheDir()
+
+    def getRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    val entry = cache.getIndexEntry(rlsMetadata)
+    // verify index files on disk
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+
+    // Simulating a concurrency issue where deleted files already exist on disk
+    // This happen when cleanerThread is slow and not able to delete index entries
+    // while same index Entry is cached again and invalidated.
+    // The new deleted file created should be replaced by existing deleted file.
+
+    // create deleted suffix file
+    Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
+    Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
+    Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
+
+    // verify deleted file exists on disk
+    assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+
+    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after invalidation")
+    TestUtils.waitUntilTrue(() => entry.isCleanStarted,
+      "Failed to cleanup cache entry after invalidation")
+
+    // verify no index files on disk
+    waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+      s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
+    waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+      s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
+    waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+      s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
+    waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+      s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}")
+  }
+
   private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
                                     = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
     val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
@@ -611,6 +818,22 @@ class RemoteIndexCacheTest {
     new TransactionIndex(metadata.startOffset(), txnIdxFile)
   }
 
+  private def createCorruptTxnIndexForSegmentMetadata(dir: File, metadata: RemoteLogSegmentMetadata): TransactionIndex = {
+    val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
+    txnIdxFile.createNewFile()
+    val txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile)
+    val abortedTxns = List(
+      new AbortedTxn(0L, 0, 10, 11),
+      new AbortedTxn(1L, 5, 15, 13),
+      new AbortedTxn(2L, 18, 35, 25),
+      new AbortedTxn(3L, 32, 50, 40))
+    abortedTxns.foreach(txnIndex.append)
+    txnIndex.close()
+
+    // open the index with a different starting offset to fake invalid data
+    return new TransactionIndex(100L, txnIdxFile)
+  }
+
   private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = {
     val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
     new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12)
@@ -680,11 +903,29 @@ class RemoteIndexCacheTest {
       })
   }
 
-  private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
-    val pw =  new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
+  private def createCorruptOffsetIndexFile(dir: File): Unit = {
+    val pw = new PrintWriter(remoteOffsetIndexFile(dir, rlsMetadata))
     pw.write("Hello, world")
     // The size of the string written in the file is 12 bytes,
     // but it should be multiple of Offset Index EntrySIZE which is equal to 8.
     pw.close()
   }
+
+  private def createCorruptTimeIndexOffsetFile(dir: File): Unit = {
+    val pw = new PrintWriter(remoteTimeIndexFile(dir, rlsMetadata))
+    pw.write("Hello, world1")
+    // The size of the string written in the file is 13 bytes,
+    // but it should be multiple of Time Index EntrySIZE which is equal to 12.
+    pw.close()
+  }
+
+  private def createCorruptedIndexFile(indexType: IndexType, dir: File): Unit = {
+    if (indexType == IndexType.OFFSET) {
+      createCorruptOffsetIndexFile(dir)
+    } else if (indexType == IndexType.TIMESTAMP) {
+      createCorruptTimeIndexOffsetFile(dir)
+    } else if (indexType == IndexType.TRANSACTION) {
+      createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
+    }
+  }
 }
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 1961207f787..42871698318 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -186,8 +186,8 @@ public class RemoteIndexCache implements Closeable {
     }
 
     // Visible for testing
-    public Path cacheDir() {
-        return cacheDir.toPath();
+    public File cacheDir() {
+        return cacheDir;
     }
 
     public void remove(Uuid key) {