You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/12 13:12:11 UTC
[kafka] branch 3.6 updated: KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM (#14363)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 9c44f705b30 KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM (#14363)
9c44f705b30 is described below
commit 9c44f705b30f734cb24494da6f233e35988088f4
Author: Abhijeet Kumar <ab...@gmail.com>
AuthorDate: Tue Sep 12 17:54:20 2023 +0530
KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM (#14363)
- Updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException instead of returning an empty InputStream when it does not have a TransactionIndex.
- Updated the LocalTieredStorage implementation to adhere to the new contract.
- Added Unit Tests for the change.
Reviewers: Satish Duggana <sa...@apache.org>, Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Christo Lolov <lo...@amazon.com>, Kamal Chandraprakash<ka...@gmail.com>
---
.../kafka/log/remote/RemoteIndexCacheTest.scala | 27 +++++++++++++++++++++-
.../log/remote/storage/RemoteStorageManager.java | 2 +-
.../storage/internals/log/RemoteIndexCache.java | 6 +++++
.../log/remote/storage/LocalTieredStorage.java | 11 ++++++---
.../log/remote/storage/LocalTieredStorageTest.java | 8 +------
5 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 007829c529b..2c0e6389221 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -20,7 +20,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
-import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
@@ -143,6 +143,31 @@ class RemoteIndexCacheTest {
verifyNoInteractions(rsm)
}
+ @Test
+ def testFetchIndexForMissingTransactionIndex(): Unit = {
+ when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+ .thenAnswer(ans => {
+ val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+ val indexType = ans.getArgument[IndexType](1)
+ val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+ val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+ maybeAppendIndexEntries(offsetIdx, timeIdx)
+ indexType match {
+ case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+ case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+ // Throw RemoteResourceNotFoundException since transaction index is not available
+ case IndexType.TRANSACTION => throw new RemoteResourceNotFoundException("txn index not found")
+ case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+ case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+ }
+ })
+
+ val entry = cache.getIndexEntry(rlsMetadata)
+ // Verify an empty file is created in the cache directory
+ assertTrue(entry.txnIndex().file().exists())
+ assertEquals(0, entry.txnIndex().file().length())
+ }
+
@Test
def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index 9d06617bd66..33918017560 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -123,7 +123,7 @@ public interface RemoteStorageManager extends Configurable, Closeable {
* Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}.
* <p>
* Note: The transaction index may not exist because of no transactional records.
- * In this case, it should still return an InputStream with empty content, instead of returning {@code null}.
+ * In this case, it should throw a RemoteResourceNotFoundException, instead of returning {@code null}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param indexType type of the index to be fetched for the segment.
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 215beabe9f6..455913947b3 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
@@ -32,6 +33,7 @@ import org.apache.kafka.server.util.ShutdownableThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -382,6 +384,10 @@ public class RemoteIndexCache implements Closeable {
TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
try {
return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION);
+ } catch (RemoteResourceNotFoundException e) {
+ // Don't throw an exception since the transaction index may not exist because of no transactional
+ // records. Instead, we return an empty stream so that an empty file is created in the cache
+ return new ByteArrayInputStream(new byte[0]);
} catch (RemoteStorageException e) {
throw new KafkaException(e);
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 64131d15559..9a643a90572 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -26,7 +26,6 @@ import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -381,8 +380,14 @@ public final class LocalTieredStorage implements RemoteStorageManager {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);
File file = fileset.getFile(fileType);
- final InputStream inputStream = (fileType.isOptional() && !file.exists()) ?
- new ByteArrayInputStream(new byte[0]) : newInputStream(file.toPath(), READ);
+
+ final InputStream inputStream;
+ if (fileType.isOptional() && !file.exists()) {
+ throw new RemoteResourceNotFoundException("Index file for type: " + indexType +
+ " not found for segment " + metadata.remoteLogSegmentId());
+ } else {
+ inputStream = newInputStream(file.toPath(), READ);
+ }
storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index 273ce6ce2a0..00ac899c349 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -70,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.LEADER_EPOCH;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.OFFSET;
@@ -347,12 +346,7 @@ public final class LocalTieredStorageTest {
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TIMESTAMP));
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, LEADER_EPOCH));
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, PRODUCER_SNAPSHOT));
-
- try {
- assertArrayEquals(new byte[0], remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, TRANSACTION)));
- } catch (Exception ex) {
- fail("Shouldn't have thrown an exception when optional file doesn't exists in the remote store");
- }
+ assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TRANSACTION));
}
@Test