You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/28 05:40:44 UTC
[kafka] branch trunk updated: KAFKA-14038: Optimise calculation of size for log in remote tier (#14049)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 722b259961c KAFKA-14038: Optimise calculation of size for log in remote tier (#14049)
722b259961c is described below
commit 722b259961c39da83db254eea54c7b87249ab1ad
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Fri Jul 28 06:40:37 2023 +0100
KAFKA-14038: Optimise calculation of size for log in remote tier (#14049)
Reviewers: Kamal Chandraprakash<ka...@gmail.com>, Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>
---
.../remote/storage/RemoteLogMetadataManager.java | 9 +++
.../storage/NoOpRemoteLogMetadataManager.java | 5 ++
.../ClassLoaderAwareRemoteLogMetadataManager.java | 5 ++
.../TopicBasedRemoteLogMetadataManager.java | 17 +++++
.../TopicBasedRemoteLogMetadataManagerTest.java | 83 ++++++++++++++++++++++
...RemoteLogMetadataManagerWrapperWithHarness.java | 5 ++
.../storage/InmemoryRemoteLogMetadataManager.java | 12 ++++
7 files changed, 136 insertions(+)
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 9a29746b292..9ae36eb00d8 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -201,4 +201,13 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
* @param partitions topic partitions that have been stopped.
*/
void onStopPartitions(Set<TopicIdPartition> partitions);
+
+ /**
+ * Returns total size of the log for the given leader epoch in remote storage.
+ *
+ * @param topicIdPartition topic partition for which size needs to be calculated.
+ * @param leaderEpoch Size will only include segments belonging to this epoch.
+ * @return Total size of the log stored in remote storage in bytes.
+ */
+ long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException;
}
\ No newline at end of file
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
index 900d5bd5c69..71881517182 100644
--- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -74,6 +74,11 @@ public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
public void onStopPartitions(Set<TopicIdPartition> partitions) {
}
+ @Override
+ public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) {
+ return 0;
+ }
+
@Override
public void close() throws IOException {
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
index 6b465851589..663d06275c5 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -101,6 +101,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetada
});
}
+ @Override
+ public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+ return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, leaderEpoch));
+ }
+
@Override
public void configure(Map<String, ?> configs) {
withClassLoader(() -> {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index ffd6e145039..9f9ef6a63f3 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -326,6 +326,23 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
}
}
+ @Override
+ public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+ long remoteLogSize = 0L;
+ // This is a simple-to-understand but not the most optimal solution.
+ // The TopicBasedRemoteLogMetadataManager's remote metadata store is file-based. During design discussions
+ // at https://lists.apache.org/thread/kxd6fffq02thbpd0p5y4mfbs062g7jr6
+ // we reached a consensus that sequential iteration over files on the local file system is performant enough.
+ // Should this stop being the case, the remote log size could be calculated by incrementing/decrementing
+ // counters during API calls for a more performant implementation.
+ Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
+ while (remoteLogSegmentMetadataIterator.hasNext()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
+ remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
+ }
+ return remoteLogSize;
+ }
+
@Override
public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null.");
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index a41a9a38699..eaf62edfea7 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -168,4 +169,86 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}
}
+ @Test
+ public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() {
+ TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+ Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
+ }
+
+ @Test
+ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException {
+ TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+ TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
+
+ RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+ RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L));
+ RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L));
+
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);
+
+ topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
+
+ // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
+ // fetching those events and build the cache.
+ waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
+
+ Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);
+
+ Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize);
+ }
+
+ @Test
+ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException {
+ TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+ TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
+
+ RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+ RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
+ RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L));
+
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);
+
+ topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
+
+ // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
+ // fetching those events and build the cache.
+ waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
+
+ Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
+ Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
+ Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
+ }
+
+ @Test
+ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException {
+ TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
+ TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
+
+ RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+ RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
+
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
+ topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
+
+ topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
+
+ // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
+ // fetching those events and build the cache.
+ waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
+
+ Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
+ }
+
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
index ef9287d31e4..e73ac31b16c 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
@@ -86,6 +86,11 @@ public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements Rem
remoteLogMetadataManagerHarness.remoteLogMetadataManager().onStopPartitions(partitions);
}
+ @Override
+ public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+ return remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSize(topicIdPartition, leaderEpoch);
+ }
+
@Override
public void close() throws IOException {
remoteLogMetadataManagerHarness.remoteLogMetadataManager().close();
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
index a970de509ad..7cc4552427e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
@@ -156,6 +156,18 @@ public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManage
// this instance. It does not depend upon stopped partitions.
}
+ @Override
+ public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
+ long remoteLogSize = 0L;
+ RemoteLogMetadataCache remoteLogMetadataCache = getRemoteLogMetadataCache(topicIdPartition);
+ Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remoteLogMetadataCache.listAllRemoteLogSegments();
+ while (remoteLogSegmentMetadataIterator.hasNext()) {
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
+ remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
+ }
+ return remoteLogSize;
+ }
+
@Override
public void close() throws IOException {
// Clearing the references to the map and assigning empty immutable maps.