You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/12 13:12:11 UTC

[kafka] branch 3.6 updated: KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM (#14363)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 9c44f705b30 KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM (#14363)
9c44f705b30 is described below

commit 9c44f705b30f734cb24494da6f233e35988088f4
Author: Abhijeet Kumar <ab...@gmail.com>
AuthorDate: Tue Sep 12 17:54:20 2023 +0530

    KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM (#14363)
    
    - Updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException instead of returning an empty InputStream when it does not have a TransactionIndex.
    - Updated the LocalTieredStorage implementation to adhere to the new contract.
    - Added Unit Tests for the change.
    
    Reviewers: Satish Duggana <sa...@apache.org>, Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Christo Lolov <lo...@amazon.com>, Kamal Chandraprakash<ka...@gmail.com>
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 27 +++++++++++++++++++++-
 .../log/remote/storage/RemoteStorageManager.java   |  2 +-
 .../storage/internals/log/RemoteIndexCache.java    |  6 +++++
 .../log/remote/storage/LocalTieredStorage.java     | 11 ++++++---
 .../log/remote/storage/LocalTieredStorageTest.java |  8 +------
 5 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 007829c529b..2c0e6389221 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -20,7 +20,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
-import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
 import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
@@ -143,6 +143,31 @@ class RemoteIndexCacheTest {
     verifyNoInteractions(rsm)
   }
 
+  @Test
+  def testFetchIndexForMissingTransactionIndex(): Unit = {
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          // Throw RemoteResourceNotFoundException since transaction index is not available
+          case IndexType.TRANSACTION => throw new RemoteResourceNotFoundException("txn index not found")
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+
+    val entry = cache.getIndexEntry(rlsMetadata)
+    // Verify an empty file is created in the cache directory
+    assertTrue(entry.txnIndex().file().exists())
+    assertEquals(0, entry.txnIndex().file().length())
+  }
+
   @Test
   def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
     val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index 9d06617bd66..33918017560 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -123,7 +123,7 @@ public interface RemoteStorageManager extends Configurable, Closeable {
      * Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}.
      * <p>
      * Note: The transaction index may not exist because of no transactional records.
-     * In this case, it should still return an InputStream with empty content, instead of returning {@code null}.
+     * In this case, it should throw a RemoteResourceNotFoundException, instead of returning {@code null}.
      *
      * @param remoteLogSegmentMetadata metadata about the remote log segment.
      * @param indexType                type of the index to be fetched for the segment.
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 215beabe9f6..455913947b3 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
@@ -32,6 +33,7 @@ import org.apache.kafka.server.util.ShutdownableThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -382,6 +384,10 @@ public class RemoteIndexCache implements Closeable {
             TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
                 try {
                     return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION);
+                } catch (RemoteResourceNotFoundException e) {
+                    // Don't throw an exception since the transaction index may not exist because of no transactional
+                    // records. Instead, we return an empty stream so that an empty file is created in the cache
+                    return new ByteArrayInputStream(new byte[0]);
                 } catch (RemoteStorageException e) {
                     throw new KafkaException(e);
                 }
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 64131d15559..9a643a90572 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -26,7 +26,6 @@ import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -381,8 +380,14 @@ public final class LocalTieredStorage implements RemoteStorageManager {
                 final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);
 
                 File file = fileset.getFile(fileType);
-                final InputStream inputStream = (fileType.isOptional() && !file.exists()) ?
-                        new ByteArrayInputStream(new byte[0]) : newInputStream(file.toPath(), READ);
+
+                final InputStream inputStream;
+                if (fileType.isOptional() && !file.exists()) {
+                    throw new RemoteResourceNotFoundException("Index file for type: " + indexType +
+                        " not found for segment " + metadata.remoteLogSegmentId());
+                } else {
+                    inputStream = newInputStream(file.toPath(), READ);
+                }
 
                 storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
 
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index 273ce6ce2a0..00ac899c349 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -70,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.LEADER_EPOCH;
 import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.OFFSET;
@@ -347,12 +346,7 @@ public final class LocalTieredStorageTest {
         assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TIMESTAMP));
         assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, LEADER_EPOCH));
         assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, PRODUCER_SNAPSHOT));
-
-        try {
-            assertArrayEquals(new byte[0], remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, TRANSACTION)));
-        } catch (Exception ex) {
-            fail("Shouldn't have thrown an exception when optional file doesn't exists in the remote store");
-        }
+        assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TRANSACTION));
     }
 
     @Test