You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/04/14 14:17:33 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

divijvaidya commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1166622209


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -581,11 +588,18 @@ public void run() {
                 if (isLeader()) {
                     // Copy log segments to remote storage
                     copyLogSegmentsToRemote();
+                    // Cleanup/delete expired remote log segments
+                    cleanupExpiredRemoteLogSegments();

Review Comment:
   Should this operation be performed in a separate thread pool which can have a defined quota? (similar to how we perform cleaning for local log using separate cleaner/background threads).
   
   I am concerned that this may impact the rate of copy to remote if amount of cleaning is large. Also, it's perhaps better to have different scaling characteristics for cleaning from remote vs. copying. Copying maybe considered urgent since slowness in copying can potentially fill up disk whereas cleaning from remote may be a lower priority activity.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {

Review Comment:
   We don't need to calculate this for time based retention. Right? If yes, can we refactor the code here so that we perform size calculation (since it requires a full scan over all log segments) only when it's required i.e. for size based retention.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
+                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset;
+
+            // All the leader epochs in sorted order that exists in remote storage
+            List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            long remainingBreachedSize = totalSize - log.config().retentionSize;
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(remainingBreachedSize);
+
+            boolean checkTimestampRetention = retentionMs > -1;
+            long cleanupTs = time.milliseconds() - retentionMs;
+            boolean checkSizeRetention = log.config().retentionSize > -1;
+            LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch);

Review Comment:
   we are only interested in segments in state COPY_SEGMENT_FINISHED or DELETE_SEGMENT_STARTED here. Right? (DELETE_SEGMENT_STARTED to clean up any stragglers)
   
   Can we make this more explicit by filtering on them?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);

Review Comment:
   please guard this update with `isLeader()`



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
+                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // This is the total size of segments in local log that have their base-offset > local-log-start-offset
+            // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
+            long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset;
+
+            // All the leader epochs in sorted order that exists in remote storage
+            List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            long remainingBreachedSize = totalSize - log.config().retentionSize;
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(remainingBreachedSize);
+
+            boolean checkTimestampRetention = retentionMs > -1;
+            long cleanupTs = time.milliseconds() - retentionMs;
+            boolean checkSizeRetention = log.config().retentionSize > -1;
+            LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    RemoteLogSegmentMetadata metadata = segmentsIterator.next();
+                    isSegmentDeleted =

Review Comment:
   could we check for `if (isCancelled() || !isLeader())` here again please to short circuit this expensive loop during shutdown.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each

Review Comment:
   I would appreciate your response on https://lists.apache.org/thread/pc6x3xr4b8y84g43dcbdzh170kb5oz1v where we discussed that even with caching mechanism, warm up of the cache is going to slow down the copying. I agree that this can be discussed outside the scope of this PR but adding the above thread as FYI.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1378,20 +1416,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  private[log] def localRetentionMs: Long = {

Review Comment:
   I would suggest to have address local retention in a separate PR. We can limit this PR to handling remote log retention only.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
     logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+    info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " +
+      s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
     logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {
+    _localLogStartOffset = offset
+
+    if (highWatermark < offset) {
+      updateHighWatermark(offset)
+    }
+
+    if (this.recoveryPoint < offset) {
+      localLog.updateRecoveryPoint(offset)
+    }
+  }
+
+  def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
+    if (!remoteLogEnabled()) {
+      warn("Ignoring the call as the remote log storage is disabled")

Review Comment:
   Do we need this at a warn level?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.

Review Comment:
   Unreferenced segments are not only the ones which have a lower epoch that earliest known epoch, they could also be ones which have an epoch that is not part of active epoch chain. How are we handling that?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())

Review Comment:
   `isCancelled() || !isLeader()`
   
   Also, please add a log which can help operator understand that this was actually cancelled. 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -595,6 +609,193 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
+            logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset);
+            updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private long remainingBreachedSize = 0L;
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(long remainingBreachedSize) {
+                this.remainingBreachedSize = remainingBreachedSize;
+            }
+
+            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
+                    // Assumption that segments contain size > 0
+                    if (checkSizeRetention && remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " +
+                            "${log.config.retentionSize} breach. Log size after deletion will be " +
+                            "${remainingBreachedSize + log.config.retentionSize}.");
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // There are two cases:
+            // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the
+            //    leader-epoch gets bumped but the log-start-offset gets truncated back to 0.
+            // 2) To remove the unreferenced segments.
+            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
+                        x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
+                                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                               boolean checkTimestampRetention,
+                                                               long cleanupTs,
+                                                               long retentionMs)
+                    throws RemoteStorageException, ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to retention time " +
+                            "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled())
+                return;
+
+            // Cleanup remote log segments and update the log start offset if applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionMs = log.config().retentionMs;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this topic partition without computing in each
+            // iteration. But the logic can become little complex and need to cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
+                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (segmentMetadata.endOffset() < log.localLogStartOffset()) {

Review Comment:
   The current logic may cause deletion of more data than anticipated.
   
   This is because it is possible to have remote segments satisfying this condition which are not part of the current leadership epoch chain. Calculation of totalSizeEarlierToLocalLogStartOffset may include these segments as well and hence, the calculation of totalSize will include local segments + all remote segments (< local log start offset). 
   The calculated TotalSize will be actually larger than the actual total size (where actual total size = size of remote + local log for the active epoch chain). This will lead to higher value of remainingBreachedSize than actual and hence, more data gets deleted than necessary.
   
   Is this making sense? Else I can provide an example to explain it better.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org