You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/10/19 03:29:15 UTC

[kafka] branch 3.6 updated: KAFKA-15479: Remote log segments should be considered once for retention breach (#14407)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new b83a71e0531 KAFKA-15479: Remote log segments should be considered once for retention breach (#14407)
b83a71e0531 is described below

commit b83a71e0531574608fd5f4607e045ca192251fb0
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Mon Sep 25 17:41:53 2023 +0530

    KAFKA-15479: Remote log segments should be considered once for retention breach (#14407)
    
    When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments less than expected. This is a follow-up of KAFKA-15352
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Christo Lolov <lo...@amazon.com>, Satish Duggana <sa...@apache.org>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  90 ++---
 .../kafka/log/remote/RemoteLogManagerTest.java     | 369 ++++++++++++---------
 2 files changed, 252 insertions(+), 207 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index d8f2144b3e3..a5a30e6d156 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -561,7 +561,6 @@ public class RemoteLogManager implements Closeable {
             }
             cache.truncateFromEnd(endOffset);
         }
-
         return checkpoint;
     }
 
@@ -706,7 +705,8 @@ public class RemoteLogManager implements Closeable {
             }
         }
 
-        private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
+        private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset)
+                throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
                 CustomMetadataSizeLimitExceededException {
             File logFile = segment.log().file();
             String logFileName = logFile.getName();
@@ -832,13 +832,11 @@ public class RemoteLogManager implements Closeable {
                 remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L);
             }
 
-            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
+            private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) {
+                boolean shouldDeleteSegment = false;
                 if (!retentionSizeData.isPresent()) {
-                    return false;
+                    return shouldDeleteSegment;
                 }
-
-                boolean shouldDeleteSegment = false;
-
                 // Assumption that segments contain size >= 0
                 if (remainingBreachedSize > 0) {
                     long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
@@ -847,7 +845,6 @@ public class RemoteLogManager implements Closeable {
                         shouldDeleteSegment = true;
                     }
                 }
-
                 if (shouldDeleteSegment) {
                     logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
                     logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
@@ -856,12 +853,12 @@ public class RemoteLogManager implements Closeable {
                 return shouldDeleteSegment;
             }
 
-            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
+            public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) {
+                boolean shouldDeleteSegment = false;
                 if (!retentionTimeData.isPresent()) {
-                    return false;
+                    return shouldDeleteSegment;
                 }
-
-                boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
+                shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
                 if (shouldDeleteSegment) {
                     remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
                     // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
@@ -873,9 +870,9 @@ public class RemoteLogManager implements Closeable {
                 return shouldDeleteSegment;
             }
 
-            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
-                                                                 long logStartOffset,
-                                                                 NavigableMap<Integer, Long> leaderEpochEntries) {
+            private boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadata,
+                                                            long logStartOffset,
+                                                            NavigableMap<Integer, Long> leaderEpochEntries) {
                 boolean shouldDeleteSegment = false;
                 if (!leaderEpochEntries.isEmpty()) {
                     // Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
@@ -916,10 +913,8 @@ public class RemoteLogManager implements Closeable {
                     remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
                             new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
                                     segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
-
                     // Delete the segment in remote storage.
                     remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
-
                     // Publish delete segment finished event.
                     remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
                             new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
@@ -932,7 +927,7 @@ public class RemoteLogManager implements Closeable {
 
         }
 
-        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+        void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
             if (isCancelled() || !isLeader()) {
                 logger.info("Returning from remote log segments cleanup as the task state is changed");
                 return;
@@ -993,13 +988,15 @@ public class RemoteLogManager implements Closeable {
                         return;
                     }
                     RemoteLogSegmentMetadata metadata = segmentsIterator.next();
-
+                    if (segmentsToDelete.contains(metadata)) {
+                        continue;
+                    }
                     // When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
                     // as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
                     // remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
                     // the epochs present in the segment lies in the checkpoint file. It will always return false
                     // since the checkpoint file was already truncated.
-                    boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
+                    boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
                             metadata, logStartOffset, epochWithOffsets);
                     boolean isValidSegment = false;
                     if (!shouldDeleteSegment) {
@@ -1007,8 +1004,8 @@ public class RemoteLogManager implements Closeable {
                         isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
                         if (isValidSegment) {
                             shouldDeleteSegment =
-                                    remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-                                            remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+                                    remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
+                                            remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
                         }
                     }
                     if (shouldDeleteSegment) {
@@ -1018,6 +1015,27 @@ public class RemoteLogManager implements Closeable {
                 }
             }
 
+            // Update log start offset with the computed value after retention cleanup is done
+            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+
+            // At this point in time we have updated the log start offsets, but not initiated a deletion.
+            // Either a follower has picked up the changes to the log start offset, or they have not.
+            // If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
+            // the deletion.
+            // However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
+            // and delete them accordingly.
+            // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
+            // again and delete them with the original deletion reason i.e. size, time or log start offset breach.
+            List<String> undeletedSegments = new ArrayList<>();
+            for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
+                if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
+                    undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
+                }
+            }
+            if (!undeletedSegments.isEmpty()) {
+                logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
+            }
+
             // Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
             // to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
             // unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
@@ -1040,27 +1058,6 @@ public class RemoteLogManager implements Closeable {
                     }
                 }
             }
-
-            // Update log start offset with the computed value after retention cleanup is done
-            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
-
-            // At this point in time we have updated the log start offsets, but not initiated a deletion.
-            // Either a follower has picked up the changes to the log start offset, or they have not.
-            // If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
-            // the deletion.
-            // However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
-            // and delete them accordingly.
-            // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
-            // again and delete them with the original deletion reason i.e. size, time or log start offset breach.
-            List<String> undeletedSegments = new ArrayList<>();
-            for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
-                if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
-                    undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
-                }
-            }
-            if (!undeletedSegments.isEmpty()) {
-                logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
-            }
         }
 
         private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
@@ -1179,7 +1176,12 @@ public class RemoteLogManager implements Closeable {
             }
         }
         // segment end offset should be with in the log end offset.
-        return segmentEndOffset < logEndOffset;
+        if (segmentEndOffset >= logEndOffset) {
+            LOGGER.debug("Segment {} end offset {} is more than log end offset {}.",
+                    segmentMetadata.remoteLogSegmentId(), segmentEndOffset, logEndOffset);
+            return false;
+        }
+        return true;
     }
 
     /**
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index bb66994b273..095976626c8 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -67,6 +67,9 @@ import org.apache.kafka.storage.internals.log.TransactionIndex;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.MockedConstruction;
@@ -94,6 +97,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
@@ -177,6 +181,7 @@ public class RemoteLogManagerTest {
             return epochs;
         }
     };
+    private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
 
     private final UnifiedLog mockLog = mock(UnifiedLog.class);
 
@@ -192,7 +197,7 @@ public class RemoteLogManagerTest {
         kafka.utils.TestUtils.clearYammerMetrics();
         remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
                 tp -> Optional.of(mockLog),
-                (topicPartition, offset) -> { },
+                (topicPartition, offset) -> currentLogStartOffset.set(offset),
                 brokerTopicStats) {
             public RemoteStorageManager createRemoteStorageManager() {
                 return remoteStorageManager;
@@ -1247,14 +1252,15 @@ public class RemoteLogManagerTest {
                 segmentEpochs7), logEndOffset, leaderEpochToStartOffset));
 
         // Test a remote segment having larger end offset than the log end offset
-        TreeMap<Integer, Long> segmentEpochs8 = new TreeMap<>();
-        segmentEpochs8.put(1, 15L);
-        segmentEpochs8.put(2, 20L);
-
         assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
                 15,
                 95, // larger than log end offset
-                segmentEpochs8), logEndOffset, leaderEpochToStartOffset));
+                leaderEpochToStartOffset), logEndOffset, leaderEpochToStartOffset));
+
+        assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                90, // equal to the log end offset
+                leaderEpochToStartOffset), logEndOffset, leaderEpochToStartOffset));
 
         // Test whether a segment's first offset is earlier to the respective epoch's start offset
         TreeMap<Integer, Long> segmentEpochs9 = new TreeMap<>();
@@ -1509,188 +1515,212 @@ public class RemoteLogManagerTest {
         }
     }
 
-    @Test
-    public void testDeleteRetentionSizeBreachingSegments() throws RemoteStorageException, IOException {
-        AtomicLong logStartOffset = new AtomicLong(0);
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
-                tp -> Optional.of(mockLog),
-                (topicPartition, offset) ->  logStartOffset.set(offset),
-                brokerTopicStats) {
-            public RemoteStorageManager createRemoteStorageManager() {
-                return remoteStorageManager;
-            }
-            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
-                return remoteLogMetadataManager;
-            }
-        }) {
-            RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
-            task.convertToLeader(0);
-
-            when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
-            when(mockLog.logEndOffset()).thenReturn(200L);
-
-            List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
-
-            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
-
-            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
-                    .thenReturn(remoteLogSegmentMetadatas.iterator());
-            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
-                    .thenReturn(remoteLogSegmentMetadatas.iterator())
-                    .thenReturn(remoteLogSegmentMetadatas.iterator())
-                    .thenReturn(remoteLogSegmentMetadatas.iterator());
-
-            checkpoint.write(epochEntries);
-            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
-            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+    @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments retentionSize={0} retentionMs={1}")
+    @CsvSource(value = {"0, -1", "-1, 0"})
+    public void testDeletionOnRetentionBreachedSegments(long retentionSize,
+                                                        long retentionMs)
+            throws RemoteStorageException, ExecutionException, InterruptedException {
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", retentionMs);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
 
-            Map<String, Long> logProps = new HashMap<>();
-            logProps.put("retention.bytes", 0L);
-            logProps.put("retention.ms", -1L);
-            LogConfig mockLogConfig = new LogConfig(logProps);
-            when(mockLog.config()).thenReturn(mockLogConfig);
+        when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
 
-            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
-                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+        List<RemoteLogSegmentMetadata> metadataList =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
+        when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
+        when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
 
-            task.run();
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+        task.cleanupExpiredRemoteLogSegments();
 
-            assertEquals(200L, logStartOffset.get());
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
-        }
+        assertEquals(200L, currentLogStartOffset.get());
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
     @Test
-    public void testDeleteRetentionMsBreachingSegments() throws RemoteStorageException, IOException {
-        AtomicLong logStartOffset = new AtomicLong(0);
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
-                tp -> Optional.of(mockLog),
-                (topicPartition, offset) ->  logStartOffset.set(offset),
-                brokerTopicStats) {
-            public RemoteStorageManager createRemoteStorageManager() {
-                return remoteStorageManager;
-            }
-            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
-                return remoteLogMetadataManager;
-            }
-        }) {
-            RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
-            task.convertToLeader(0);
+    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, ExecutionException, InterruptedException {
+        RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+        leaderTask.convertToLeader(0);
 
-            when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
-            when(mockLog.logEndOffset()).thenReturn(200L);
+        when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
 
-            List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
 
-            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
+        List<RemoteLogSegmentMetadata> metadataList =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
+        when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
 
-            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
-                    .thenReturn(remoteLogSegmentMetadatas.iterator());
-            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
-                    .thenReturn(remoteLogSegmentMetadatas.iterator())
-                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
 
-            checkpoint.write(epochEntries);
-            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
-            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", -1L);
+        logProps.put("retention.ms", 0L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenAnswer(answer -> {
+                    // cancel the task so that we don't delete the second segment
+                    leaderTask.cancel();
+                    return CompletableFuture.runAsync(() -> {
+                    });
+                });
 
-            Map<String, Long> logProps = new HashMap<>();
-            logProps.put("retention.bytes", -1L);
-            logProps.put("retention.ms", 0L);
-            LogConfig mockLogConfig = new LogConfig(logProps);
-            when(mockLog.config()).thenReturn(mockLogConfig);
+        leaderTask.cleanupExpiredRemoteLogSegments();
 
-            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
-                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+        assertEquals(200L, currentLogStartOffset.get());
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+        verify(remoteStorageManager, never()).deleteLogSegmentData(metadataList.get(1));
 
-            task.run();
+        // test that the 2nd log segment will be deleted by the new leader
+        RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
+        newLeaderTask.convertToLeader(1);
 
-            assertEquals(200L, logStartOffset.get());
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
-        }
-    }
+        Iterator<RemoteLogSegmentMetadata> firstIterator = metadataList.iterator();
+        firstIterator.next();
+        Iterator<RemoteLogSegmentMetadata> secondIterator = metadataList.iterator();
+        secondIterator.next();
+        Iterator<RemoteLogSegmentMetadata> thirdIterator = metadataList.iterator();
+        thirdIterator.next();
 
-    @Test
-    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, IOException {
-        AtomicLong logStartOffset = new AtomicLong(0);
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
-                tp -> Optional.of(mockLog),
-                (topicPartition, offset) ->  logStartOffset.set(offset),
-                brokerTopicStats) {
-            public RemoteStorageManager createRemoteStorageManager() {
-                return remoteStorageManager;
-            }
-            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
-                return remoteLogMetadataManager;
-            }
-        }) {
-            RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
-            leaderTask.convertToLeader(0);
-
-            when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
-            when(mockLog.logEndOffset()).thenReturn(200L);
-
-            List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
-
-            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
-
-            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
-                    .thenReturn(remoteLogSegmentMetadatas.iterator());
-            when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
-                    .thenReturn(remoteLogSegmentMetadatas.iterator())
-                    .thenReturn(remoteLogSegmentMetadatas.iterator());
-
-            checkpoint.write(epochEntries);
-            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
-            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
-
-            Map<String, Long> logProps = new HashMap<>();
-            logProps.put("retention.bytes", -1L);
-            logProps.put("retention.ms", 0L);
-            LogConfig mockLogConfig = new LogConfig(logProps);
-            when(mockLog.config()).thenReturn(mockLogConfig);
-
-            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
-                    .thenAnswer(answer -> {
-                        // cancel the task so that we don't delete the second segment
-                        leaderTask.cancel();
-                        return CompletableFuture.runAsync(() -> {
-                        });
-                    });
+        when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
+                .thenReturn(firstIterator);
+        when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0))
+                .thenReturn(secondIterator)
+                .thenReturn(thirdIterator);
 
-            leaderTask.run();
+        when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
 
-            assertEquals(200L, logStartOffset.get());
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-            verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+        newLeaderTask.cleanupExpiredRemoteLogSegments();
 
-            // test that the 2nd log segment will be deleted by the new leader
-            RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
-            newLeaderTask.convertToLeader(1);
+        assertEquals(200L, currentLogStartOffset.get());
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
+    }
 
-            Iterator<RemoteLogSegmentMetadata> firstIterator = remoteLogSegmentMetadatas.iterator();
-            firstIterator.next();
-            Iterator<RemoteLogSegmentMetadata> secondIterator = remoteLogSegmentMetadatas.iterator();
-            secondIterator.next();
-            Iterator<RemoteLogSegmentMetadata> thirdIterator = remoteLogSegmentMetadatas.iterator();
-            thirdIterator.next();
+    @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}")
+    @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
+    public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,
+                                                             int deletableSegmentCount)
+            throws RemoteStorageException, ExecutionException, InterruptedException {
+        int recordsPerSegment = 100;
+        int segmentSize = 1024;
+        List<EpochEntry> epochEntries = Arrays.asList(
+                new EpochEntry(0, 0L),
+                new EpochEntry(1, 20L),
+                new EpochEntry(3, 50L),
+                new EpochEntry(4, 100L)
+        );
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch;
+
+        long localLogSegmentsSize = 512L;
+        long retentionSize = ((long) segmentCount - deletableSegmentCount) * segmentSize + localLogSegmentsSize;
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", -1L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        long localLogStartOffset = (long) segmentCount * recordsPerSegment;
+        long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1;
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+        when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
 
-            when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
-                    .thenReturn(firstIterator);
-            when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0))
-                    .thenReturn(secondIterator)
-                    .thenReturn(thirdIterator);
+        List<RemoteLogSegmentMetadata> segmentMetadataList = listRemoteLogSegmentMetadata(
+                leaderTopicIdPartition, segmentCount, recordsPerSegment, segmentSize, epochEntries);
+        verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
+    }
 
-            when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
-                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+    @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionTimeBreach segmentCount={0} deletableSegmentCount={1}")
+    @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
+    public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount,
+                                                             int deletableSegmentCount)
+            throws RemoteStorageException, ExecutionException, InterruptedException {
+        int recordsPerSegment = 100;
+        int segmentSize = 1024;
+        List<EpochEntry> epochEntries = Arrays.asList(
+                new EpochEntry(0, 0L),
+                new EpochEntry(1, 20L),
+                new EpochEntry(3, 50L),
+                new EpochEntry(4, 100L)
+        );
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch;
+
+        long localLogSegmentsSize = 512L;
+        long retentionSize = -1L;
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", 1L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        long localLogStartOffset = (long) segmentCount * recordsPerSegment;
+        long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1;
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+        when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
 
-            newLeaderTask.run();
+        List<RemoteLogSegmentMetadata> segmentMetadataList = listRemoteLogSegmentMetadataByTime(
+                leaderTopicIdPartition, segmentCount, deletableSegmentCount, recordsPerSegment, segmentSize, epochEntries);
+        verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
+    }
 
-            assertEquals(200L, logStartOffset.get());
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
-            verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+    private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> segmentMetadataList,
+                                        int deletableSegmentCount,
+                                        int currentLeaderEpoch)
+            throws RemoteStorageException, ExecutionException, InterruptedException {
+        when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(segmentMetadataList.iterator());
+        when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
+                .thenAnswer(invocation -> {
+                    int leaderEpoch = invocation.getArgument(1);
+                    return segmentMetadataList.stream()
+                            .filter(segmentMetadata -> segmentMetadata.segmentLeaderEpochs().containsKey(leaderEpoch))
+                            .iterator();
+                });
+        when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(currentLeaderEpoch);
+        task.cleanupExpiredRemoteLogSegments();
+
+        ArgumentCaptor<RemoteLogSegmentMetadata> deletedMetadataCapture = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+        verify(remoteStorageManager, times(deletableSegmentCount)).deleteLogSegmentData(deletedMetadataCapture.capture());
+        if (deletableSegmentCount > 0) {
+            List<RemoteLogSegmentMetadata> deletedMetadataList = deletedMetadataCapture.getAllValues();
+            RemoteLogSegmentMetadata expectedEndMetadata = segmentMetadataList.get(deletableSegmentCount - 1);
+            assertEquals(segmentMetadataList.get(0), deletedMetadataList.get(0));
+            assertEquals(expectedEndMetadata, deletedMetadataList.get(deletedMetadataList.size() - 1));
+            assertEquals(currentLogStartOffset.get(), expectedEndMetadata.endOffset() + 1);
         }
     }
 
@@ -1706,9 +1736,22 @@ public class RemoteLogManagerTest {
                                                                         int recordsPerSegment,
                                                                         int segmentSize,
                                                                         List<EpochEntry> epochEntries) {
+        return listRemoteLogSegmentMetadataByTime(
+                topicIdPartition, segmentCount, 0, recordsPerSegment, segmentSize, epochEntries);
+    }
+
+    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition,
+                                                                              int segmentCount,
+                                                                              int deletableSegmentCount,
+                                                                              int recordsPerSegment,
+                                                                              int segmentSize,
+                                                                              List<EpochEntry> epochEntries) {
         List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
         for (int idx = 0; idx < segmentCount; idx++) {
             long timestamp = time.milliseconds();
+            if (idx < deletableSegmentCount) {
+                timestamp = time.milliseconds() - 1;
+            }
             long startOffset = (long) idx * recordsPerSegment;
             long endOffset = startOffset + recordsPerSegment - 1;
             List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ? totalEpochEntries : epochEntries;