You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/28 05:40:44 UTC

[kafka] branch trunk updated: KAFKA-14038: Optimise calculation of size for log in remote tier (#14049)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 722b259961c KAFKA-14038: Optimise calculation of size for log in remote tier (#14049)
722b259961c is described below

commit 722b259961c39da83db254eea54c7b87249ab1ad
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Fri Jul 28 06:40:37 2023 +0100

    KAFKA-14038: Optimise calculation of size for log in remote tier (#14049)
    
    Reviewers: Kamal Chandraprakash<ka...@gmail.com>, Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>
---
 .../remote/storage/RemoteLogMetadataManager.java   |  9 +++
 .../storage/NoOpRemoteLogMetadataManager.java      |  5 ++
 .../ClassLoaderAwareRemoteLogMetadataManager.java  |  5 ++
 .../TopicBasedRemoteLogMetadataManager.java        | 17 +++++
 .../TopicBasedRemoteLogMetadataManagerTest.java    | 83 ++++++++++++++++++++++
 ...RemoteLogMetadataManagerWrapperWithHarness.java |  5 ++
 .../storage/InmemoryRemoteLogMetadataManager.java  | 12 ++++
 7 files changed, 136 insertions(+)

diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 9a29746b292..9ae36eb00d8 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -201,4 +201,13 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
      * @param partitions topic partitions that have been stopped.
      */
     void onStopPartitions(Set<TopicIdPartition> partitions);
+
+    /**
+     * Returns total size of the log for the given leader epoch in remote storage.
+     *
+     * @param topicIdPartition topic partition for which size needs to be calculated.
+     * @param leaderEpoch Size will only include segments belonging to this epoch.
+     * @return Total size of the log stored in remote storage in bytes.
+     */
+    long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException;
 }
\ No newline at end of file
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
index 900d5bd5c69..71881517182 100644
--- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -74,6 +74,11 @@ public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
     public void onStopPartitions(Set<TopicIdPartition> partitions) {
     }
 
+    @Override
+    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) {
+        return 0;
+    }
+
     @Override
     public void close() throws IOException {
     }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
index 6b465851589..663d06275c5 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -101,6 +101,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetada
         });
     }
 
+    @Override
+    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, leaderEpoch));
+    }
+
     @Override
     public void configure(Map<String, ?> configs) {
         withClassLoader(() -> {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index ffd6e145039..9f9ef6a63f3 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -326,6 +326,23 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
         }
     }
 
+    @Override
+    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+        long remoteLogSize = 0L;
+        // This is a simple-to-understand but not the most optimal solution.
+        // The TopicBasedRemoteLogMetadataManager's remote metadata store is file-based. During design discussions
+        // at https://lists.apache.org/thread/kxd6fffq02thbpd0p5y4mfbs062g7jr6
+        // we reached a consensus that sequential iteration over files on the local file system is performant enough.
+        // Should this stop being the case, the remote log size could be calculated by incrementing/decrementing
+        // counters during API calls for a more performant implementation.
+        Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
+        while (remoteLogSegmentMetadataIterator.hasNext()) {
+            RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
+            remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
+        }
+        return remoteLogSize;
+    }
+
     @Override
     public void configure(Map<String, ?> configs) {
         Objects.requireNonNull(configs, "configs can not be null.");
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index a41a9a38699..eaf62edfea7 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -168,4 +169,86 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         }
     }
 
+    @Test
+    public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() {
+        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+        Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
+    }
+
+    @Test
+    public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException {
+        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+        TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
+
+        RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L));
+        RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L));
+
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);
+
+        topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
+
+        // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
+        // fetching those events and build the cache.
+        waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
+
+        Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);
+
+        Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize);
+    }
+
+    @Test
+    public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException {
+        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+        TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
+
+        RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
+        RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L));
+
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);
+
+        topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
+
+        // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
+        // fetching those events and build the cache.
+        waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
+
+        Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
+        Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
+        Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
+    }
+
+    @Test
+    public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException {
+        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+        TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
+
+        RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+                100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
+
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
+        topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
+
+        topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
+
+        // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
+        // fetching those events and build the cache.
+        waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
+
+        Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
+    }
+
 }
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
index ef9287d31e4..e73ac31b16c 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
@@ -86,6 +86,11 @@ public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements Rem
         remoteLogMetadataManagerHarness.remoteLogMetadataManager().onStopPartitions(partitions);
     }
 
+    @Override
+    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSize(topicIdPartition, leaderEpoch);
+    }
+
     @Override
     public void close() throws IOException {
         remoteLogMetadataManagerHarness.remoteLogMetadataManager().close();
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
index a970de509ad..7cc4552427e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
@@ -156,6 +156,18 @@ public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManage
         // this instance. It does not depend upon stopped partitions.
     }
 
+    @Override
+    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+        long remoteLogSize = 0L;
+        RemoteLogMetadataCache remoteLogMetadataCache = getRemoteLogMetadataCache(topicIdPartition);
+        Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remoteLogMetadataCache.listAllRemoteLogSegments();
+        while (remoteLogSegmentMetadataIterator.hasNext()) {
+            RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
+            remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
+        }
+        return remoteLogSize;
+    }
+
     @Override
     public void close() throws IOException {
         // Clearing the references to the map and assigning empty immutable maps.